Class: Fluent::KafkaInput::OffsetManager

Inherits:
Object
  • Object
show all
Defined in:
lib/fluent/plugin/in_kafka.rb

Instance Method Summary collapse

Constructor Details

#initialize(topic_entry, zookeeper, zk_root_node) ⇒ OffsetManager

Returns a new instance of OffsetManager.



336
337
338
339
340
# File 'lib/fluent/plugin/in_kafka.rb', line 336

def initialize(topic_entry, zookeeper, zk_root_node)
  @zookeeper = zookeeper
  @zk_path = "#{zk_root_node}/#{topic_entry.topic}/#{topic_entry.partition}/next_offset"
  create_node(@zk_path, topic_entry.topic, topic_entry.partition)
end

Instance Method Details

#create_node(zk_path, topic, partition) ⇒ Object



342
343
344
345
346
347
348
349
# File 'lib/fluent/plugin/in_kafka.rb', line 342

def create_node(zk_path, topic, partition)
  path = ""
  zk_path.split(/(\/[^\/]+)/).reject(&:empty?).each { |dir|
    path = path + dir
    @zookeeper.create(:path => "#{path}")
  }
  $log.trace "use zk offset node : #{path}"
end

#next_offsetObject



351
352
353
# File 'lib/fluent/plugin/in_kafka.rb', line 351

def next_offset
  @zookeeper.get(:path => @zk_path)[:data].to_i
end

#save_offset(offset) ⇒ Object



355
356
357
358
# File 'lib/fluent/plugin/in_kafka.rb', line 355

def save_offset(offset)
  @zookeeper.set(:path => @zk_path, :data => offset.to_s)
  $log.trace "update zk offset node : #{offset.to_s}"
end