Class: NulogyMessageBusConsumer::Tasks::SuperviseConsumerLag
- Inherits:
-
Object
- Object
- NulogyMessageBusConsumer::Tasks::SuperviseConsumerLag
- Defined in:
- lib/nulogy_message_bus_consumer/tasks/supervise_consumer_lag.rb
Overview
Supervises the consumer’s lag.
If a partition’s lag is non-zero and does not change for an extended period of time, then kill the main thread.
That period of time is check_interval_seconds * LagTracker#failing_checks With the defaults, that would be 20 * 6 ~ 120 seconds = 2 minutes.
Note that this strategy may not work for a busy integration. Consumer lag monitoring should alert in that case. However, this strategy may help alleviate alerts for low traffic or off-peak environments.
We’ve come across cases where the consumer lag is still being logged, messages are being processed, but the consumer is not consuming messages in particular topics.
Killing the main thread causes ECS to restart the task.
Instance Attribute Summary collapse
-
#interval ⇒ Object
readonly
Returns the value of attribute interval.
Instance Method Summary collapse
- #call ⇒ Object
- #extract_args(kafka_consumer:, **_) ⇒ Object
-
#initialize(logger, tracker: NulogyMessageBusConsumer::LagTracker.new(failing_checks: 6), killable: nil, check_interval_seconds: 20) ⇒ SuperviseConsumerLag
constructor
A new instance of SuperviseConsumerLag.
Constructor Details
#initialize(logger, tracker: NulogyMessageBusConsumer::LagTracker.new(failing_checks: 6), killable: nil, check_interval_seconds: 20) ⇒ SuperviseConsumerLag
Returns a new instance of SuperviseConsumerLag.
24 25 26 27 28 29 |
# File 'lib/nulogy_message_bus_consumer/tasks/supervise_consumer_lag.rb', line 24 def initialize(logger, tracker: NulogyMessageBusConsumer::LagTracker.new(failing_checks: 6), killable: nil, check_interval_seconds: 20) @logger = logger @tracker = tracker @killable = killable @interval = check_interval_seconds end |
Instance Attribute Details
#interval ⇒ Object (readonly)
Returns the value of attribute interval.
22 23 24 |
# File 'lib/nulogy_message_bus_consumer/tasks/supervise_consumer_lag.rb', line 22 def interval @interval end |
Instance Method Details
#call ⇒ Object
36 37 38 39 40 41 42 43 44 45 46 |
# File 'lib/nulogy_message_bus_consumer/tasks/supervise_consumer_lag.rb', line 36 def call NulogyMessageBusConsumer::KafkaUtils.wait_for_assignment(@consumer) @tracker.update(@consumer.lag(@consumer.committed)) if @tracker.failing? log_failed_partitions @killable.kill Thread.current.exit end end |
#extract_args(kafka_consumer:, **_) ⇒ Object
31 32 33 34 |
# File 'lib/nulogy_message_bus_consumer/tasks/supervise_consumer_lag.rb', line 31 def extract_args(kafka_consumer:, **_) @consumer = kafka_consumer @killable ||= Thread.current end |