Class: Deimos::Utils::LagReporter::Topic
- Inherits:
-
Object
- Object
- Deimos::Utils::LagReporter::Topic
- Defined in:
- lib/deimos/utils/lag_reporter.rb
Overview
Topic which has a hash of partition => last known offset lag
Instance Attribute Summary collapse
- #consumer_group ⇒ ConsumerGroup
- #partition_offset_lags ⇒ Hash<Integer, Integer>
- #topic_name ⇒ String
Instance Method Summary collapse
- #assign_lag(partition, lag) ⇒ Object
- #compute_lag(partition, offset) ⇒ Object
-
#initialize(topic_name, group) ⇒ Topic
constructor
A new instance of Topic.
- #report_lag(partition) ⇒ Object
Constructor Details
#initialize(topic_name, group) ⇒ Topic
Returns a new instance of Topic.
61 62 63 64 65 |
# File 'lib/deimos/utils/lag_reporter.rb', line 61 def initialize(topic_name, group) self.topic_name = topic_name self.consumer_group = group self.partition_offset_lags = {} end |
Instance Attribute Details
#consumer_group ⇒ ConsumerGroup
57 58 59 |
# File 'lib/deimos/utils/lag_reporter.rb', line 57 def consumer_group @consumer_group end |
#partition_offset_lags ⇒ Hash<Integer, Integer>
55 56 57 |
# File 'lib/deimos/utils/lag_reporter.rb', line 55 def partition_offset_lags @partition_offset_lags end |
#topic_name ⇒ String
53 54 55 |
# File 'lib/deimos/utils/lag_reporter.rb', line 53 def topic_name @topic_name end |
Instance Method Details
#assign_lag(partition, lag) ⇒ Object
69 70 71 |
# File 'lib/deimos/utils/lag_reporter.rb', line 69 def assign_lag(partition, lag) self.partition_offset_lags[partition.to_i] = lag end |
#compute_lag(partition, offset) ⇒ Object
75 76 77 78 79 80 81 82 83 84 85 86 |
# File 'lib/deimos/utils/lag_reporter.rb', line 75 def compute_lag(partition, offset) return if self.partition_offset_lags[partition.to_i] begin client = Phobos.create_kafka_client last_offset = client.last_offset_for(self.topic_name, partition) assign_lag(partition, [last_offset - offset, 0].max) rescue StandardError # don't do anything, just wait Deimos.config.logger. debug("Error computing lag for #{self.topic_name}, will retry") end end |
#report_lag(partition) ⇒ Object
89 90 91 92 93 94 95 96 97 98 99 100 101 |
# File 'lib/deimos/utils/lag_reporter.rb', line 89 def report_lag(partition) lag = self.partition_offset_lags[partition.to_i] return unless lag group = self.consumer_group.id Deimos.config.logger. debug("Sending lag: #{group}/#{partition}: #{lag}") Deimos.config.metrics&.gauge('consumer_lag', lag, tags: %W( consumer_group:#{group} partition:#{partition} topic:#{self.topic_name} )) end |