Class: Deimos::Utils::LagReporter::Topic

Inherits:
Object
  • Object
show all
Defined in:
lib/deimos/utils/lag_reporter.rb

Overview

Topic which has a hash of partition => last known current offsets

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(topic_name, group) ⇒ Topic

Returns a new instance of Topic.

Parameters:



54
55
56
57
58
# File 'lib/deimos/utils/lag_reporter.rb', line 54

def initialize(topic_name, group)
  self.topic_name = topic_name
  self.consumer_group = group
  self.partition_current_offsets = {}
end

Instance Attribute Details

#consumer_groupConsumerGroup

Returns:



50
51
52
# File 'lib/deimos/utils/lag_reporter.rb', line 50

def consumer_group
  @consumer_group
end

#partition_current_offsetsHash<Integer, Integer>

Returns:

  • (Hash<Integer, Integer>)


48
49
50
# File 'lib/deimos/utils/lag_reporter.rb', line 48

def partition_current_offsets
  @partition_current_offsets
end

#topic_nameString

Returns:

  • (String)


46
47
48
# File 'lib/deimos/utils/lag_reporter.rb', line 46

def topic_name
  @topic_name
end

Instance Method Details

#assign_current_offset(partition, offset) ⇒ void

This method returns an undefined value.

Parameters:

  • partition (Integer)
  • offset (Integer)


63
64
65
# File 'lib/deimos/utils/lag_reporter.rb', line 63

def assign_current_offset(partition, offset)
  self.partition_current_offsets[partition.to_i] = offset
end

#compute_lag(partition, offset) ⇒ Integer

Parameters:

  • partition (Integer)
  • offset (Integer)

Returns:

  • (Integer)


70
71
72
73
74
75
76
77
78
79
80
# File 'lib/deimos/utils/lag_reporter.rb', line 70

def compute_lag(partition, offset)
  begin
    client = Phobos.create_kafka_client
    last_offset = client.last_offset_for(self.topic_name, partition)
    lag = last_offset - offset
  rescue StandardError # don't do anything, just wait
    Deimos.config.logger.
      debug("Error computing lag for #{self.topic_name}, will retry")
  end
  lag || 0
end

#report_lag(partition) ⇒ void

This method returns an undefined value.

Parameters:

  • partition (Integer)


84
85
86
87
88
89
90
91
92
93
94
95
96
97
# File 'lib/deimos/utils/lag_reporter.rb', line 84

def report_lag(partition)
  current_offset = self.partition_current_offsets[partition.to_i]
  return unless current_offset

  lag = compute_lag(partition, current_offset)
  group = self.consumer_group.id
  Deimos.config.logger.
    debug("Sending lag: #{group}/#{partition}: #{lag}")
  Deimos.config.metrics&.gauge('consumer_lag', lag, tags: %W(
                                 consumer_group:#{group}
                                 partition:#{partition}
                                 topic:#{self.topic_name}
                               ))
end