Class: LogStash::Inputs::Azure::Processor

Inherits:
Object
  • Object
show all
Includes:
Util::Loggable
Defined in:
lib/logstash/inputs/processor.rb

Instance Method Summary collapse

Constructor Details

#initialize(queue, codec, checkpoint_interval, decorator, meta_data) ⇒ Processor

Returns a new instance of Processor.



10
11
12
13
14
15
16
17
18
# File 'lib/logstash/inputs/processor.rb', line 10

def initialize(queue, codec, checkpoint_interval, decorator, )
  @queue = queue
  @codec = codec
  @checkpoint_interval = checkpoint_interval
  @decorator = decorator
  @meta_data = 
  @logger = self.logger

end

Instance Method Details

#onClose(context, reason) ⇒ Object



24
25
26
# File 'lib/logstash/inputs/processor.rb', line 24

def onClose(context, reason)
  @logger.info("Event Hub: #{context.getEventHubPath.to_s}, Partition: #{context.getPartitionId.to_s} is closing. (reason=#{reason.to_s})")
end

#onError(context, error) ⇒ Object



72
73
74
# File 'lib/logstash/inputs/processor.rb', line 72

def onError(context, error)
  @logger.error("Event Hub: #{context.getEventHubPath.to_s}, Partition: #{context.getPartitionId.to_s} experienced an error #{error.to_s})")
end

#onEvents(context, batch) ⇒ Object



28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
# File 'lib/logstash/inputs/processor.rb', line 28

def onEvents(context, batch)
  @logger.debug("Event Hub: #{context.getEventHubPath.to_s}, Partition: #{context.getPartitionId.to_s} is processing a batch of size #{batch.size}.") if @logger.debug?
  last_payload = nil
  batch_size = 0
  batch.each do |payload|
    last_checkpoint = Time.now.to_i
    bytes = payload.getBytes
    batch_size += bytes.size
    @logger.trace("Event Hub: #{context.getEventHubPath.to_s}, Partition: #{context.getPartitionId.to_s}, Offset: #{payload.getSystemProperties.getOffset.to_s},"+
                      " Sequence: #{payload.getSystemProperties.getSequenceNumber.to_s}, Size: #{bytes.size}") if @logger.trace?

    @codec.decode(bytes.to_a.pack('C*')) do |event|

      @decorator.call(event)
      if @meta_data
        event.set("[@metadata][azure_event_hubs][name]", context.getEventHubPath)
        event.set("[@metadata][azure_event_hubs][consumer_group]", context.getConsumerGroupName)
        event.set("[@metadata][azure_event_hubs][processor_host]", context.getOwner)
        event.set("[@metadata][azure_event_hubs][partition]", context.getPartitionId)
        event.set("[@metadata][azure_event_hubs][offset]", payload.getSystemProperties.getOffset)
        event.set("[@metadata][azure_event_hubs][sequence]", payload.getSystemProperties.getSequenceNumber)
        event.set("[@metadata][azure_event_hubs][timestamp]",payload.getSystemProperties.getEnqueuedTime.getEpochSecond)
        event.set("[@metadata][azure_event_hubs][event_size]", bytes.size)
        event.set("[@metadata][azure_event_hubs][user_properties]", payload.getProperties)
      end
      @queue << event
      if @checkpoint_interval > 0
        now = Time.now.to_i
        since_last_check_point = now - last_checkpoint
        if since_last_check_point >= @checkpoint_interval
          context.checkpoint(payload).get
          last_checkpoint = now
        end
      end
    end
    last_payload = payload
  end

  @codec.flush
  #always create checkpoint at end of onEvents in case of sparse events
  context.checkpoint(last_payload).get if last_payload
  @logger.debug("Event Hub: #{context.getEventHubPath.to_s}, Partition: #{context.getPartitionId.to_s} finished processing a batch of #{batch_size} bytes.") if @logger.debug?
end

#onOpen(context) ⇒ Object



20
21
22
# File 'lib/logstash/inputs/processor.rb', line 20

def onOpen(context)
  @logger.info("Event Hub: #{context.getEventHubPath.to_s}, Partition: #{context.getPartitionId.to_s} is opening.")
end