28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
|
# File 'lib/logstash/inputs/processor.rb', line 28
def onEvents(context, batch)
@logger.debug("Event Hub: #{context.getEventHubPath.to_s}, Partition: #{context.getPartitionId.to_s} is processing a batch of size #{batch.size}.") if @logger.debug?
last_payload = nil
batch_size = 0
batch.each do |payload|
last_checkpoint = Time.now.to_i
bytes = payload.getBytes
batch_size += bytes.size
@logger.trace("Event Hub: #{context.getEventHubPath.to_s}, Partition: #{context.getPartitionId.to_s}, Offset: #{payload.getSystemProperties.getOffset.to_s},"+
" Sequence: #{payload.getSystemProperties.getSequenceNumber.to_s}, Size: #{bytes.size}") if @logger.trace?
@codec.decode(bytes.to_a.pack('C*')) do |event|
@decorator.call(event)
if @meta_data
event.set("[@metadata][azure_event_hubs][name]", context.getEventHubPath)
event.set("[@metadata][azure_event_hubs][consumer_group]", context.getConsumerGroupName)
event.set("[@metadata][azure_event_hubs][processor_host]", context.getOwner)
event.set("[@metadata][azure_event_hubs][partition]", context.getPartitionId)
event.set("[@metadata][azure_event_hubs][offset]", payload.getSystemProperties.getOffset)
event.set("[@metadata][azure_event_hubs][sequence]", payload.getSystemProperties.getSequenceNumber)
event.set("[@metadata][azure_event_hubs][timestamp]",payload.getSystemProperties.getEnqueuedTime.getEpochSecond)
event.set("[@metadata][azure_event_hubs][event_size]", bytes.size)
event.set("[@metadata][azure_event_hubs][user_properties]", payload.getProperties)
end
@queue << event
if @checkpoint_interval > 0
now = Time.now.to_i
since_last_check_point = now - last_checkpoint
if since_last_check_point >= @checkpoint_interval
context.checkpoint(payload).get
last_checkpoint = now
end
end
end
last_payload = payload
end
@codec.flush
context.checkpoint(last_payload).get if last_payload
@logger.debug("Event Hub: #{context.getEventHubPath.to_s}, Partition: #{context.getPartitionId.to_s} finished processing a batch of #{batch_size} bytes.") if @logger.debug?
end
|