Method: Rdkafka::Consumer#position
- Defined in:
- lib/rdkafka/consumer.rb
permalink #position(list = nil) ⇒ TopicPartitionList
Return the current positions (offsets) for topics and partitions. The offset field of each requested partition will be set to the offset of the last consumed message + 1, or nil in case there was no previous message.
281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 |
# File 'lib/rdkafka/consumer.rb', line 281 def position(list=nil) if list.nil? list = assignment elsif !list.is_a?(TopicPartitionList) raise TypeError.new("list has to be nil or a TopicPartitionList") end tpl = list.to_native_tpl response = @native_kafka.with_inner do |inner| Rdkafka::Bindings.rd_kafka_position(inner, tpl) end if response != 0 raise Rdkafka::RdkafkaError.new(response) end TopicPartitionList.from_native_tpl(tpl) end |