Class: Deimos::Utils::LagReporter::Topic
- Inherits:
-
Object
- Object
- Deimos::Utils::LagReporter::Topic
- Defined in:
- lib/deimos/utils/lag_reporter.rb
Overview
Topic which has a hash of partition => last known current offsets
Instance Attribute Summary collapse
- #consumer_group ⇒ ConsumerGroup
- #partition_current_offsets ⇒ Hash<Integer, Integer>
- #topic_name ⇒ String
Instance Method Summary collapse
- #assign_current_offset(partition, offset) ⇒ void
- #compute_lag(partition, offset) ⇒ Integer
-
#initialize(topic_name, group) ⇒ Topic
constructor
A new instance of Topic.
- #report_lag(partition) ⇒ void
Constructor Details
#initialize(topic_name, group) ⇒ Topic
Returns a new instance of Topic.
54 55 56 57 58 |
# File 'lib/deimos/utils/lag_reporter.rb', line 54 def initialize(topic_name, group) self.topic_name = topic_name self.consumer_group = group self.partition_current_offsets = {} end |
Instance Attribute Details
#consumer_group ⇒ ConsumerGroup
50 51 52 |
# File 'lib/deimos/utils/lag_reporter.rb', line 50 def consumer_group @consumer_group end |
#partition_current_offsets ⇒ Hash<Integer, Integer>
48 49 50 |
# File 'lib/deimos/utils/lag_reporter.rb', line 48 def partition_current_offsets @partition_current_offsets end |
#topic_name ⇒ String
46 47 48 |
# File 'lib/deimos/utils/lag_reporter.rb', line 46 def topic_name @topic_name end |
Instance Method Details
#assign_current_offset(partition, offset) ⇒ void
This method returns an undefined value.
63 64 65 |
# File 'lib/deimos/utils/lag_reporter.rb', line 63 def assign_current_offset(partition, offset) self.partition_current_offsets[partition.to_i] = offset end |
#compute_lag(partition, offset) ⇒ Integer
70 71 72 73 74 75 76 77 78 79 80 |
# File 'lib/deimos/utils/lag_reporter.rb', line 70 def compute_lag(partition, offset) begin client = Phobos.create_kafka_client last_offset = client.last_offset_for(self.topic_name, partition) lag = last_offset - offset rescue StandardError # don't do anything, just wait Deimos.config.logger. debug("Error computing lag for #{self.topic_name}, will retry") end lag || 0 end |
#report_lag(partition) ⇒ void
This method returns an undefined value.
84 85 86 87 88 89 90 91 92 93 94 95 96 97 |
# File 'lib/deimos/utils/lag_reporter.rb', line 84 def report_lag(partition) current_offset = self.partition_current_offsets[partition.to_i] return unless current_offset lag = compute_lag(partition, current_offset) group = self.consumer_group.id Deimos.config.logger. debug("Sending lag: #{group}/#{partition}: #{lag}") Deimos.config.metrics&.gauge('consumer_lag', lag, tags: %W( consumer_group:#{group} partition:#{partition} topic:#{self.topic_name} )) end |