Method: Rdkafka::Consumer#lag
- Defined in:
- lib/rdkafka/consumer.rb
permalink #lag(topic_partition_list, watermark_timeout_ms = 1000) ⇒ Hash<String, Hash<Integer, Integer>>
Calculate the consumer lag per partition for the provided topic partition list. You can get a suitable list by calling #committed or #position (TODO). It is also possible to create one yourself, in this case you have to provide a list that already contains all the partitions you need the lag for.
344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 |
# File 'lib/rdkafka/consumer.rb', line 344 def lag(topic_partition_list, watermark_timeout_ms=1000) out = {} topic_partition_list.to_h.each do |topic, partitions| # Query high watermarks for this topic's partitions # and compare to the offset in the list. topic_out = {} partitions.each do |p| next if p.offset.nil? low, high = query_watermark_offsets( topic, p.partition, watermark_timeout_ms ) topic_out[p.partition] = high - p.offset end out[topic] = topic_out end out end |