Class: Fluent::KafkaInput::TopicWatcher
- Inherits:
-
Coolio::TimerWatcher
- Object
- Coolio::TimerWatcher
- Fluent::KafkaInput::TopicWatcher
- Defined in:
- lib/fluent/plugin/in_kafka.rb
Instance Method Summary collapse
- #consume ⇒ Object
-
#initialize(topic, host, port, client_id, partition, offset, interval, format, message_key, add_prefix, add_suffix, options = {}) ⇒ TopicWatcher
constructor
A new instance of TopicWatcher.
- #on_timer ⇒ Object
- #parse_line(record) ⇒ Object
Constructor Details
#initialize(topic, host, port, client_id, partition, offset, interval, format, message_key, add_prefix, add_suffix, options = {}) ⇒ TopicWatcher
Returns a new instance of TopicWatcher.
75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 |
# File 'lib/fluent/plugin/in_kafka.rb', line 75 def initialize(topic, host, port, client_id, partition, offset, interval, format, , add_prefix, add_suffix, ={}) @topic = topic @callback = method(:consume) @format = format @message_key = @add_prefix = add_prefix @add_suffix = add_suffix @consumer = Poseidon::PartitionConsumer.new( client_id, # client_id host, # host port, # port topic, # topic partition, # partition offset, # offset # options ) super(interval, true) end |
Instance Method Details
#consume ⇒ Object
103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 |
# File 'lib/fluent/plugin/in_kafka.rb', line 103 def consume es = MultiEventStream.new tag = @topic tag = @add_prefix + "." + tag if @add_prefix tag = tag + "." + @add_suffix if @add_suffix @consumer.fetch.each { |msg| begin msg_record = parse_line(msg.value) es.add(Time.now.to_i, msg_record) rescue $log.warn msg_record.to_s, :error=>$!.to_s $log.debug_backtrace end } unless es.empty? Engine.emit_stream(tag, es) end end |
#on_timer ⇒ Object
95 96 97 98 99 100 101 |
# File 'lib/fluent/plugin/in_kafka.rb', line 95 def on_timer @callback.call rescue # TODO log? $log.error $!.to_s $log.error_backtrace end |
#parse_line(record) ⇒ Object
123 124 125 126 127 128 129 130 131 132 133 134 135 136 |
# File 'lib/fluent/plugin/in_kafka.rb', line 123 def parse_line(record) parsed_record = {} case @format when 'json' parsed_record = Yajl::Parser.parse(record) when 'ltsv' parsed_record = LTSV.parse(record) when 'msgpack' parsed_record = MessagePack.unpack(record) when 'text' parsed_record[@message_key] = record end parsed_record end |