Class: FluQ::Input::Kafka

Inherits:
Base
  • Object
show all
Defined in:
lib/fluq/input/kafka.rb

Instance Method Summary collapse

Constructor Details

#initializeKafka

Constructor.

Examples:


FluQ::Input::Kafka.new handlers,
  topic: "page-views",
  group: "fluq",
  brokers: ["localhost:9092"],
  zookeepers: ["localhost:2181"]

Parameters:

  • options (Hash)

    a customizable set of options



29
30
31
# File 'lib/fluq/input/kafka.rb', line 29

def initialize(*)
  super
end

Instance Method Details

#descriptionString

Returns descriptive name.

Returns:

  • (String)

    descriptive name



39
40
41
# File 'lib/fluq/input/kafka.rb', line 39

def description
  "#{name} (#{config[:group]} <- #{config[:brokers].join(',')})"
end

#nameString

Returns short name.

Returns:

  • (String)

    short name



34
35
36
# File 'lib/fluq/input/kafka.rb', line 34

def name
  "kafka:#{config[:topic]}"
end

#process(partition, messages) ⇒ Object

Processes messages

Parameters:

  • partition (Integer)
  • messages (Array<Poseidon::Message>)


55
56
57
58
59
60
61
62
63
64
# File 'lib/fluq/input/kafka.rb', line 55

def process(partition, messages)
  events = []
  messages.each do |m|
    events.concat format.parse(m.value)
  end
  events.each do |event|
    event.meta.update topic: config[:topic], partition: partition
  end
  worker.process events
end

#runObject

Start the loop



44
45
46
47
48
49
50
# File 'lib/fluq/input/kafka.rb', line 44

def run
  super

  consumer.fetch_loop do |partition, bulk|
    process partition, bulk
  end
end