Class: Fluent::KafkaInput

Inherits:
Input
  • Object
show all
Defined in:
lib/fluent/plugin/in_kafka.rb

Defined Under Namespace

Classes: TopicWatcher

Instance Method Summary collapse

Constructor Details

#initializeKafkaInput

Returns a new instance of KafkaInput.



24
25
26
27
# File 'lib/fluent/plugin/in_kafka.rb', line 24

def initialize
  super
  require 'poseidon'
end

Instance Method Details

#configure(conf) ⇒ Object



29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
# File 'lib/fluent/plugin/in_kafka.rb', line 29

def configure(conf)
  super
  @topic_list = @topics.split(',').map {|topic| topic.strip }
  if @topic_list.empty?
    raise ConfigError, "kafka: 'topics' is a require parameter"
  end

  case @format
  when 'json'
    require 'yajl'
  when 'ltsv'
    require 'ltsv'
  when 'msgpack'
    require 'msgpack'
  end
end

#runObject



67
68
69
70
71
72
# File 'lib/fluent/plugin/in_kafka.rb', line 67

def run
  @loop.run
rescue
  $log.error "unexpected error", :error=>$!.to_s
  $log.error_backtrace
end

#shutdownObject



63
64
65
# File 'lib/fluent/plugin/in_kafka.rb', line 63

def shutdown
  @loop.stop
end

#startObject



46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
# File 'lib/fluent/plugin/in_kafka.rb', line 46

def start
  @loop = Coolio::Loop.new
  opt = {}
  opt[:max_bytes] = @max_bytes if @max_bytes
  opt[:max_wait_ms] = @max_wait_ms if @max_wait_ms
  opt[:min_bytes] = @min_bytes if @min_bytes
  opt[:socket_timeout_ms] = @socket_timeout_ms if @socket_timeout_ms

  @topic_watchers = @topic_list.map {|topic|
    TopicWatcher.new(topic, @host, @port, @client_id, @partition, @offset, interval, @format, @message_key, @add_prefix, @add_suffix, opt)
  }
  @topic_watchers.each {|tw|
    tw.attach(@loop)
  }
  @thread = Thread.new(&method(:run))
end