Method: Rdkafka::Consumer#position

Defined in:
lib/rdkafka/consumer.rb

#position(list = nil) ⇒ TopicPartitionList

Return the current positions (offsets) for topics and partitions. The offset field of each requested partition will be set to the offset of the last consumed message + 1, or nil in case there was no previous message.

Parameters:

  • list (TopicPartitionList, nil) (defaults to: nil)

    The topic with partitions to get the offsets for or nil to use the current subscription.

Returns:

Raises:

[View source] [View on GitHub]

281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
# File 'lib/rdkafka/consumer.rb', line 281

def position(list=nil)
  if list.nil?
    list = assignment
  elsif !list.is_a?(TopicPartitionList)
    raise TypeError.new("list has to be nil or a TopicPartitionList")
  end

  tpl = list.to_native_tpl

  response = @native_kafka.with_inner do |inner|
    Rdkafka::Bindings.rd_kafka_position(inner, tpl)
  end

  if response != 0
    raise Rdkafka::RdkafkaError.new(response)
  end

  TopicPartitionList.from_native_tpl(tpl)
end