Class: Deimos::Utils::LagReporter::Topic

Inherits:
Object
  • Object
show all
Defined in:
lib/deimos/utils/lag_reporter.rb

Overview

Topic which has a hash of partition => last known offset lag

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(topic_name, group) ⇒ Topic



61
62
63
64
65
# File 'lib/deimos/utils/lag_reporter.rb', line 61

def initialize(topic_name, group)
  self.topic_name = topic_name
  self.consumer_group = group
  self.partition_offset_lags = {}
end

Instance Attribute Details

#consumer_groupConsumerGroup



57
58
59
# File 'lib/deimos/utils/lag_reporter.rb', line 57

def consumer_group
  @consumer_group
end

#partition_offset_lagsHash<Integer, Integer>



55
56
57
# File 'lib/deimos/utils/lag_reporter.rb', line 55

def partition_offset_lags
  @partition_offset_lags
end

#topic_nameString



53
54
55
# File 'lib/deimos/utils/lag_reporter.rb', line 53

def topic_name
  @topic_name
end

Instance Method Details

#assign_lag(partition, lag) ⇒ Object



69
70
71
# File 'lib/deimos/utils/lag_reporter.rb', line 69

def assign_lag(partition, lag)
  self.partition_offset_lags[partition.to_i] = lag
end

#compute_lag(partition, offset) ⇒ Object



75
76
77
78
79
80
81
82
83
84
85
86
# File 'lib/deimos/utils/lag_reporter.rb', line 75

def compute_lag(partition, offset)
  return if self.partition_offset_lags[partition.to_i]

  begin
    client = Phobos.create_kafka_client
    last_offset = client.last_offset_for(self.topic_name, partition)
    assign_lag(partition, [last_offset - offset, 0].max)
  rescue StandardError # don't do anything, just wait
    Deimos.config.logger.
      debug("Error computing lag for #{self.topic_name}, will retry")
  end
end

#report_lag(partition) ⇒ Object



89
90
91
92
93
94
95
96
97
98
99
100
101
# File 'lib/deimos/utils/lag_reporter.rb', line 89

def report_lag(partition)
  lag = self.partition_offset_lags[partition.to_i]
  return unless lag

  group = self.consumer_group.id
  Deimos.config.logger.
    debug("Sending lag: #{group}/#{partition}: #{lag}")
  Deimos.config.metrics&.gauge('consumer_lag', lag, tags: %W(
                                 consumer_group:#{group}
                                 partition:#{partition}
                                 topic:#{self.topic_name}
                               ))
end