Class: NulogyMessageBusConsumer::Tasks::SuperviseConsumerLag

Inherits:
Object
  • Object
show all
Defined in:
lib/nulogy_message_bus_consumer/tasks/supervise_consumer_lag.rb

Overview

Supervises the consumer’s lag.

If a partition’s lag is non-zero and does not change for an extended period of time, then kill the main thread.

That period of time is check_interval_seconds * LagTracker#failing_checks With the defaults, that would be 20 * 6 ~ 120 seconds = 2 minutes.

Note that this strategy may not work for a busy integration. Consumer lag monitoring should alert in that case. However, this strategy may help alleviate alerts for low traffic or off-peak environments.

We’ve come across cases where the consumer lag is still being logged, messages are being processed, but the consumer is not consuming messages in particular topics.

Killing the main thread causes ECS to restart the task.

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(logger, tracker: NulogyMessageBusConsumer::LagTracker.new(failing_checks: 6), killable: nil, check_interval_seconds: 20) ⇒ SuperviseConsumerLag

Returns a new instance of SuperviseConsumerLag.



24
25
26
27
28
29
# File 'lib/nulogy_message_bus_consumer/tasks/supervise_consumer_lag.rb', line 24

def initialize(logger, tracker: NulogyMessageBusConsumer::LagTracker.new(failing_checks: 6), killable: nil, check_interval_seconds: 20)
  @logger = logger
  @tracker = tracker
  @killable = killable
  @interval = check_interval_seconds
end

Instance Attribute Details

#intervalObject (readonly)

Returns the value of attribute interval.



22
23
24
# File 'lib/nulogy_message_bus_consumer/tasks/supervise_consumer_lag.rb', line 22

def interval
  @interval
end

Instance Method Details

#callObject



36
37
38
39
40
41
42
43
44
45
46
# File 'lib/nulogy_message_bus_consumer/tasks/supervise_consumer_lag.rb', line 36

def call
  NulogyMessageBusConsumer::KafkaUtils.wait_for_assignment(@consumer)

  @tracker.update(@consumer.lag(@consumer.committed))
  if @tracker.failing?
    log_failed_partitions

    @killable.kill
    Thread.current.exit
  end
end

#extract_args(kafka_consumer:, **_) ⇒ Object



31
32
33
34
# File 'lib/nulogy_message_bus_consumer/tasks/supervise_consumer_lag.rb', line 31

def extract_args(kafka_consumer:, **_)
  @consumer = kafka_consumer
  @killable ||= Thread.current
end