Class: Fluent::KafkaInput::OffsetManager
- Inherits:
-
Object
- Object
- Fluent::KafkaInput::OffsetManager
- Defined in:
- lib/fluent/plugin/in_kafka.rb
Instance Method Summary collapse
- #create_node(zk_path, topic, partition) ⇒ Object
-
#initialize(topic_entry, zookeeper, zk_root_node) ⇒ OffsetManager
constructor
A new instance of OffsetManager.
- #next_offset ⇒ Object
- #save_offset(offset) ⇒ Object
Constructor Details
#initialize(topic_entry, zookeeper, zk_root_node) ⇒ OffsetManager
Returns a new instance of OffsetManager.
336 337 338 339 340 |
# File 'lib/fluent/plugin/in_kafka.rb', line 336 def initialize(topic_entry, zookeeper, zk_root_node) @zookeeper = zookeeper @zk_path = "#{zk_root_node}/#{topic_entry.topic}/#{topic_entry.partition}/next_offset" create_node(@zk_path, topic_entry.topic, topic_entry.partition) end |
Instance Method Details
#create_node(zk_path, topic, partition) ⇒ Object
342 343 344 345 346 347 348 349 |
# File 'lib/fluent/plugin/in_kafka.rb', line 342 def create_node(zk_path, topic, partition) path = "" zk_path.split(/(\/[^\/]+)/).reject(&:empty?).each { |dir| path = path + dir @zookeeper.create(:path => "#{path}") } $log.trace "use zk offset node : #{path}" end |
#next_offset ⇒ Object
351 352 353 |
# File 'lib/fluent/plugin/in_kafka.rb', line 351 def next_offset @zookeeper.get(:path => @zk_path)[:data].to_i end |
#save_offset(offset) ⇒ Object
355 356 357 358 |
# File 'lib/fluent/plugin/in_kafka.rb', line 355 def save_offset(offset) @zookeeper.set(:path => @zk_path, :data => offset.to_s) $log.trace "update zk offset node : #{offset.to_s}" end |