Method: Rdkafka::Consumer#lag

Defined in:
lib/rdkafka/consumer.rb

#lag(topic_partition_list, watermark_timeout_ms = 1000) ⇒ Hash<String, Hash<Integer, Integer>>

Calculate the consumer lag per partition for the provided topic partition list. You can get a suitable list by calling #committed or #position (TODO). It is also possible to create one yourself, in this case you have to provide a list that already contains all the partitions you need the lag for.

Parameters:

  • topic_partition_list (TopicPartitionList)

    The list to calculate lag for.

  • watermark_timeout_ms (Integer) (defaults to: 1000)

    The timeout for each query watermark call.

Returns:

  • (Hash<String, Hash<Integer, Integer>>)

    A hash containing all topics with the lag per partition

Raises:

[View source] [View on GitHub]

344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
# File 'lib/rdkafka/consumer.rb', line 344

def lag(topic_partition_list, watermark_timeout_ms=1000)
  out = {}

  topic_partition_list.to_h.each do |topic, partitions|
    # Query high watermarks for this topic's partitions
    # and compare to the offset in the list.
    topic_out = {}
    partitions.each do |p|
      next if p.offset.nil?
      low, high = query_watermark_offsets(
        topic,
        p.partition,
        watermark_timeout_ms
      )
      topic_out[p.partition] = high - p.offset
    end
    out[topic] = topic_out
  end
  out
end