Class: NulogyMessageBusConsumer::LagTracker
- Inherits:
-
Object
- Object
- NulogyMessageBusConsumer::LagTracker
- Defined in:
- lib/nulogy_message_bus_consumer/lag_tracker.rb
Overview
Keeps track of how many times a topic’s partition has not changed (non-zero) lag between update calls.
Instance Attribute Summary collapse
-
#failing_checks ⇒ Object
readonly
Returns the value of attribute failing_checks.
Instance Method Summary collapse
- #failed ⇒ Object
- #failing? ⇒ Boolean
-
#initialize(failing_checks: 3) ⇒ LagTracker
constructor
A new instance of LagTracker.
- #update(topic_partitions) ⇒ Object
Constructor Details
#initialize(failing_checks: 3) ⇒ LagTracker
Returns a new instance of LagTracker.
8 9 10 11 12 |
# File 'lib/nulogy_message_bus_consumer/lag_tracker.rb', line 8 def initialize(failing_checks: 3) @failing_checks = failing_checks @tracked = Hash.new { |h, topic| h[topic] = {} } @failed = Hash.new { |h, topic| h[topic] = Set.new } end |
Instance Attribute Details
#failing_checks ⇒ Object (readonly)
Returns the value of attribute failing_checks.
6 7 8 |
# File 'lib/nulogy_message_bus_consumer/lag_tracker.rb', line 6 def failing_checks @failing_checks end |
Instance Method Details
#failed ⇒ Object
26 27 28 |
# File 'lib/nulogy_message_bus_consumer/lag_tracker.rb', line 26 def failed @failed.transform_values { |v| v.to_a.sort } end |
#failing? ⇒ Boolean
22 23 24 |
# File 'lib/nulogy_message_bus_consumer/lag_tracker.rb', line 22 def failing? @failed.any? end |
#update(topic_partitions) ⇒ Object
14 15 16 17 18 19 20 |
# File 'lib/nulogy_message_bus_consumer/lag_tracker.rb', line 14 def update(topic_partitions) topic_partitions.each_pair do |topic, partitions| partitions.each_pair do |partition, value| update_topic_partition(topic, partition, value) end end end |