Class: Fluent::KafkaInput
- Inherits:
-
Input
- Object
- Input
- Fluent::KafkaInput
- Defined in:
- lib/fluent/plugin/in_kafka.rb
Defined Under Namespace
Classes: TopicWatcher
Instance Method Summary collapse
- #configure(conf) ⇒ Object
-
#initialize ⇒ KafkaInput
constructor
A new instance of KafkaInput.
- #run ⇒ Object
- #shutdown ⇒ Object
- #start ⇒ Object
Constructor Details
#initialize ⇒ KafkaInput
Returns a new instance of KafkaInput.
24 25 26 27 |
# File 'lib/fluent/plugin/in_kafka.rb', line 24 def initialize super require 'poseidon' end |
Instance Method Details
#configure(conf) ⇒ Object
29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 |
# File 'lib/fluent/plugin/in_kafka.rb', line 29 def configure(conf) super @topic_list = @topics.split(',').map {|topic| topic.strip } if @topic_list.empty? raise ConfigError, "kafka: 'topics' is a require parameter" end case @format when 'json' require 'yajl' when 'ltsv' require 'ltsv' when 'msgpack' require 'msgpack' end end |
#run ⇒ Object
67 68 69 70 71 72 |
# File 'lib/fluent/plugin/in_kafka.rb', line 67 def run @loop.run rescue $log.error "unexpected error", :error=>$!.to_s $log.error_backtrace end |
#shutdown ⇒ Object
63 64 65 |
# File 'lib/fluent/plugin/in_kafka.rb', line 63 def shutdown @loop.stop end |
#start ⇒ Object
46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 |
# File 'lib/fluent/plugin/in_kafka.rb', line 46 def start @loop = Coolio::Loop.new opt = {} opt[:max_bytes] = @max_bytes if @max_bytes opt[:max_wait_ms] = @max_wait_ms if @max_wait_ms opt[:min_bytes] = @min_bytes if @min_bytes opt[:socket_timeout_ms] = @socket_timeout_ms if @socket_timeout_ms @topic_watchers = @topic_list.map {|topic| TopicWatcher.new(topic, @host, @port, @client_id, @partition, @offset, interval, @format, @message_key, @add_prefix, @add_suffix, opt) } @topic_watchers.each {|tw| tw.attach(@loop) } @thread = Thread.new(&method(:run)) end |