Class: NulogyMessageBusConsumer::Tasks::LogConsumerLag
- Inherits:
-
Object
- Object
- NulogyMessageBusConsumer::Tasks::LogConsumerLag
- Defined in:
- lib/nulogy_message_bus_consumer/tasks/log_consumer_lag.rb
Defined Under Namespace
Modules: Calculator
Instance Attribute Summary collapse
-
#interval ⇒ Object
readonly
Returns the value of attribute interval.
Instance Method Summary collapse
- #call ⇒ Object
- #extract_args(kafka_consumer:, **_) ⇒ Object
-
#initialize(logger, interval_seconds, lag_timeout) ⇒ LogConsumerLag
constructor
A new instance of LogConsumerLag.
Constructor Details
#initialize(logger, interval_seconds, lag_timeout) ⇒ LogConsumerLag
Returns a new instance of LogConsumerLag.
6 7 8 9 10 |
# File 'lib/nulogy_message_bus_consumer/tasks/log_consumer_lag.rb', line 6 def initialize(logger, interval_seconds, lag_timeout) @logger = logger @interval = interval_seconds @lag_timeout = lag_timeout end |
Instance Attribute Details
#interval ⇒ Object (readonly)
Returns the value of attribute interval.
4 5 6 |
# File 'lib/nulogy_message_bus_consumer/tasks/log_consumer_lag.rb', line 4 def interval @interval end |
Instance Method Details
#call ⇒ Object
16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 |
# File 'lib/nulogy_message_bus_consumer/tasks/log_consumer_lag.rb', line 16 def call # Delayed start. If we attempt to read consumer#committed immediately, it may fail. # We suspect this is because the consumer#committed is called before the consumer # has finished connecting. There appears to be a race condition. KafkaUtils.wait_for_assignment(@kafka_consumer) # Note: consumer#committed has a timeout of 1200ms. To respect our lag_timeout, use the largest. committed_timeout = [1200, @lag_timeout].max # The first parameter is a TopicPartitionList. When nil, it uses all the assigned ones. committed_offsets = @kafka_consumer.committed(nil, committed_timeout) lag_per_topic = @kafka_consumer.lag(committed_offsets, @lag_timeout) @logger.info(JSON.dump({ event: "consumer_lag", topics: Calculator.add_max_lag(lag_per_topic) })) $stdout.flush end |
#extract_args(kafka_consumer:, **_) ⇒ Object
12 13 14 |
# File 'lib/nulogy_message_bus_consumer/tasks/log_consumer_lag.rb', line 12 def extract_args(kafka_consumer:, **_) @kafka_consumer = kafka_consumer end |