Class: Fluent::KafkaInput::OffsetManager
- Inherits:
-
Object
- Object
- Fluent::KafkaInput::OffsetManager
- Defined in:
- lib/fluent/plugin/in_kafka.rb
Instance Method Summary collapse
- #create_node(zk_path, topic, partition) ⇒ Object
-
#initialize(topic_entry, zookeeper, zk_root_node) ⇒ OffsetManager
constructor
A new instance of OffsetManager.
- #next_offset ⇒ Object
- #save_offset(offset) ⇒ Object
Constructor Details
#initialize(topic_entry, zookeeper, zk_root_node) ⇒ OffsetManager
Returns a new instance of OffsetManager.
319 320 321 322 323 |
# File 'lib/fluent/plugin/in_kafka.rb', line 319 def initialize(topic_entry, zookeeper, zk_root_node) @zookeeper = zookeeper @zk_path = "#{zk_root_node}/#{topic_entry.topic}/#{topic_entry.partition}/next_offset" create_node(@zk_path, topic_entry.topic, topic_entry.partition) end |
Instance Method Details
#create_node(zk_path, topic, partition) ⇒ Object
325 326 327 328 329 330 331 332 |
# File 'lib/fluent/plugin/in_kafka.rb', line 325 def create_node(zk_path, topic, partition) path = "" zk_path.split(/(\/[^\/]+)/).reject(&:empty?).each { |dir| path = path + dir @zookeeper.create(:path => "#{path}") } $log.trace "use zk offset node : #{path}" end |
#next_offset ⇒ Object
334 335 336 |
# File 'lib/fluent/plugin/in_kafka.rb', line 334 def next_offset @zookeeper.get(:path => @zk_path)[:data].to_i end |
#save_offset(offset) ⇒ Object
338 339 340 341 |
# File 'lib/fluent/plugin/in_kafka.rb', line 338 def save_offset(offset) @zookeeper.set(:path => @zk_path, :data => offset.to_s) $log.trace "update zk offset node : #{offset.to_s}" end |