Class: FluQ::Input::Kafka
- Inherits:
-
Base
- Object
- Base
- FluQ::Input::Kafka
- Defined in:
- lib/fluq/input/kafka.rb
Instance Method Summary collapse
-
#description ⇒ String
Descriptive name.
-
#initialize ⇒ Kafka
constructor
Constructor.
-
#name ⇒ String
Short name.
-
#process(partition, messages) ⇒ Object
Processes messages.
-
#run ⇒ Object
Start the loop.
Constructor Details
#initialize ⇒ Kafka
Constructor.
29 30 31 |
# File 'lib/fluq/input/kafka.rb', line 29 def initialize(*) super end |
Instance Method Details
#description ⇒ String
Returns descriptive name.
39 40 41 |
# File 'lib/fluq/input/kafka.rb', line 39 def description "#{name} (#{config[:group]} <- #{config[:brokers].join(',')})" end |
#name ⇒ String
Returns short name.
34 35 36 |
# File 'lib/fluq/input/kafka.rb', line 34 def name "kafka:#{config[:topic]}" end |
#process(partition, messages) ⇒ Object
Processes messages
55 56 57 58 59 60 61 62 63 64 |
# File 'lib/fluq/input/kafka.rb', line 55 def process(partition, ) events = [] .each do |m| events.concat format.parse(m.value) end events.each do |event| event..update topic: config[:topic], partition: partition end worker.process events end |
#run ⇒ Object
Start the loop
44 45 46 47 48 49 50 |
# File 'lib/fluq/input/kafka.rb', line 44 def run super consumer.fetch_loop do |partition, bulk| process partition, bulk end end |