Class: Fluent::KafkaInput::TopicWatcher

Inherits:
Coolio::TimerWatcher
  • Object
show all
Defined in:
lib/fluent/plugin/in_kafka.rb

Instance Method Summary collapse

Constructor Details

#initialize(topic, host, port, client_id, partition, offset, interval, format, message_key, add_prefix, add_suffix, options = {}) ⇒ TopicWatcher

Returns a new instance of TopicWatcher.



75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
# File 'lib/fluent/plugin/in_kafka.rb', line 75

def initialize(topic, host, port, client_id, partition, offset, interval, format, message_key, add_prefix, add_suffix, options={})
  @topic = topic
  @callback = method(:consume)
  @format = format
  @message_key = message_key
  @add_prefix = add_prefix
  @add_suffix = add_suffix
  @consumer = Poseidon::PartitionConsumer.new(
    client_id,            # client_id
    host,                 # host
    port,                 # port
    topic,                # topic
    partition,            # partition
    offset,               # offset
    options               # options
  )
    
  super(interval, true)
end

Instance Method Details

#consumeObject



103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
# File 'lib/fluent/plugin/in_kafka.rb', line 103

def consume
  es = MultiEventStream.new
  tag = @topic
  tag = @add_prefix + "." + tag if @add_prefix
  tag = tag + "." + @add_suffix if @add_suffix
  @consumer.fetch.each { |msg|
    begin
      msg_record = parse_line(msg.value)
      es.add(Time.now.to_i, msg_record)
    rescue
      $log.warn msg_record.to_s, :error=>$!.to_s
      $log.debug_backtrace
    end
  }

  unless es.empty?
    Engine.emit_stream(tag, es)
  end
end

#on_timerObject



95
96
97
98
99
100
101
# File 'lib/fluent/plugin/in_kafka.rb', line 95

def on_timer
  @callback.call
rescue
    # TODO log?
    $log.error $!.to_s
    $log.error_backtrace
end

#parse_line(record) ⇒ Object



123
124
125
126
127
128
129
130
131
132
133
134
135
136
# File 'lib/fluent/plugin/in_kafka.rb', line 123

def parse_line(record)
  parsed_record = {}
  case @format
  when 'json'
    parsed_record = Yajl::Parser.parse(record)
  when 'ltsv'
    parsed_record = LTSV.parse(record)
  when 'msgpack'
    parsed_record = MessagePack.unpack(record)
  when 'text'
    parsed_record[@message_key] = record
  end
  parsed_record
end