Class: NulogyMessageBusConsumer::Tasks::LogConsumerLag

Inherits:
Object
  • Object
show all
Defined in:
lib/nulogy_message_bus_consumer/tasks/log_consumer_lag.rb

Defined Under Namespace

Modules: Calculator

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(logger, interval_seconds, lag_timeout) ⇒ LogConsumerLag

Returns a new instance of LogConsumerLag.



6
7
8
9
10
# File 'lib/nulogy_message_bus_consumer/tasks/log_consumer_lag.rb', line 6

def initialize(logger, interval_seconds, lag_timeout)
  @logger = logger
  @interval = interval_seconds
  @lag_timeout = lag_timeout
end

Instance Attribute Details

#intervalObject (readonly)

Returns the value of attribute interval.



4
5
6
# File 'lib/nulogy_message_bus_consumer/tasks/log_consumer_lag.rb', line 4

def interval
  @interval
end

Instance Method Details

#callObject



16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
# File 'lib/nulogy_message_bus_consumer/tasks/log_consumer_lag.rb', line 16

def call
  # Delayed start. If we attempt to read consumer#committed immediately, it may fail.
  # We suspect this is because the consumer#committed is called before the consumer
  # has finished connecting. There appears to be a race condition.
  KafkaUtils.wait_for_assignment(@kafka_consumer)

  # Note: consumer#committed has a timeout of 1200ms. To respect our lag_timeout, use the largest.
  committed_timeout = [1200, @lag_timeout].max
  # The first parameter is a TopicPartitionList. When nil, it uses all the assigned ones.
  committed_offsets = @kafka_consumer.committed(nil, committed_timeout)

  lag_per_topic = @kafka_consumer.lag(committed_offsets, @lag_timeout)

  @logger.info(JSON.dump({
    event: "consumer_lag",
    topics: Calculator.add_max_lag(lag_per_topic)
  }))
  $stdout.flush
end

#extract_args(kafka_consumer:, **_) ⇒ Object



12
13
14
# File 'lib/nulogy_message_bus_consumer/tasks/log_consumer_lag.rb', line 12

def extract_args(kafka_consumer:, **_)
  @kafka_consumer = kafka_consumer
end