Method: Rdkafka::Consumer#store_offset

Defined in:
lib/rdkafka/consumer.rb

#store_offset(message) ⇒ nil

Store offset of a message to be used in the next commit of this consumer

When using this enable.auto.offset.store should be set to false in the config.

Parameters:

Returns:

  • (nil)

Raises:


394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
# File 'lib/rdkafka/consumer.rb', line 394

def store_offset(message)
  closed_consumer_check(__method__)

  list = TopicPartitionList.new
  list.add_topic_and_partitions_with_offsets(
    message.topic,
    message.partition => message.offset + 1
  )

  tpl = list.to_native_tpl

  response = @native_kafka.with_inner do |inner|
    Rdkafka::Bindings.rd_kafka_offsets_store(
      inner,
      tpl
    )
  end

  if response != 0
    raise Rdkafka::RdkafkaError.new(response)
  end
ensure
  Rdkafka::Bindings.rd_kafka_topic_partition_list_destroy(tpl) if tpl
end