Class: Kafka::Prometheus::ConsumerSubscriber

Inherits:
ActiveSupport::Subscriber
  • Object
show all
Defined in:
lib/kafka/prometheus.rb

Instance Method Summary collapse

Constructor Details

#initializeConsumerSubscriber

Returns a new instance of ConsumerSubscriber.



70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
# File 'lib/kafka/prometheus.rb', line 70

def initialize
  super
  @process_messages = Prometheus.registry.counter(:consumer_process_messages, docstring: 'Total messages', labels: [:client, :group_id, :topic, :partition])
  @process_message_errors = Prometheus.registry.counter(:consumer_process_message_errors, docstring: 'Total errors', labels: [:client, :group_id, :topic, :partition])
  @process_message_latency =
    Prometheus.registry.histogram(:consumer_process_message_latency, docstring: 'Latency', buckets: LATENCY_BUCKETS, labels: [:client, :group_id, :topic, :partition])
  @offset_lag = Prometheus.registry.gauge(:consumer_offset_lag, docstring: 'Offset lag', labels: [:client, :group_id, :topic, :partition])
  @time_lag = Prometheus.registry.gauge(:consumer_time_lag, docstring: 'Time lag of message', labels: [:client, :group_id, :topic, :partition])
  @process_batch_errors = Prometheus.registry.counter(:consumer_process_batch_errors, docstring: 'Total errors in batch', labels: [:client, :group_id, :topic, :partition])
  @process_batch_latency =
    Prometheus.registry.histogram(:consumer_process_batch_latency, docstring: 'Latency in batch', buckets: LATENCY_BUCKETS, labels: [:client, :group_id, :topic, :partition])
  @batch_size = Prometheus.registry.histogram(:consumer_batch_size, docstring: 'Size of batch', buckets: SIZE_BUCKETS, labels: [:client, :group_id, :topic, :partition])
  @join_group = Prometheus.registry.histogram(:consumer_join_group, docstring: 'Time to join group', buckets: DELAY_BUCKETS, labels: [:client, :group_id])
  @join_group_errors = Prometheus.registry.counter(:consumer_join_group_errors, docstring: 'Total error in joining group', labels: [:client, :group_id])
  @sync_group = Prometheus.registry.histogram(:consumer_sync_group, docstring: 'Time to sync group', buckets: DELAY_BUCKETS, labels: [:client, :group_id])
  @sync_group_errors = Prometheus.registry.counter(:consumer_sync_group_errors, docstring: 'Total error in syncing group', labels: [:client, :group_id])
  @leave_group = Prometheus.registry.histogram(:consumer_leave_group, docstring: 'Time to leave group', buckets: DELAY_BUCKETS, labels: [:client, :group_id])
  @leave_group_errors = Prometheus.registry.counter(:consumer_leave_group_errors, docstring: 'Total error in leaving group', labels: [:client, :group_id])
  @pause_duration = Prometheus.registry.gauge(:consumer_pause_duration, docstring: 'Pause duration', labels: [:client, :group_id, :topic, :partition])
end

Instance Method Details

#fetch_batch(event) ⇒ Object



136
137
138
139
140
141
142
143
144
145
146
147
148
# File 'lib/kafka/prometheus.rb', line 136

def fetch_batch(event)
  key = {
    client: event.payload.fetch(:client_id),
    group_id: event.payload.fetch(:group_id),
    topic: event.payload.fetch(:topic),
    partition: event.payload.fetch(:partition)
  }
  offset_lag = event.payload.fetch(:offset_lag)
  batch_size = event.payload.fetch(:message_count)

  @batch_size.observe(batch_size, labels: key)
  @offset_lag.set(offset_lag, labels: key)
end

#join_group(event) ⇒ Object



150
151
152
153
154
155
# File 'lib/kafka/prometheus.rb', line 150

def join_group(event)
  key = { client: event.payload.fetch(:client_id), group_id: event.payload.fetch(:group_id) }
  @join_group.observe(event.duration, labels: key)

  @join_group_errors.increment(labels: key) if event.payload.key?(:exception)
end

#leave_group(event) ⇒ Object



164
165
166
167
168
169
# File 'lib/kafka/prometheus.rb', line 164

def leave_group(event)
  key = { client: event.payload.fetch(:client_id), group_id: event.payload.fetch(:group_id) }
  @leave_group.observe(event.duration, labels: key)

  @leave_group_errors.increment(labels: key) if event.payload.key?(:exception)
end

#pause_status(event) ⇒ Object



171
172
173
174
175
176
177
178
179
180
181
# File 'lib/kafka/prometheus.rb', line 171

def pause_status(event)
  key = {
    client: event.payload.fetch(:client_id),
    group_id: event.payload.fetch(:group_id),
    topic: event.payload.fetch(:topic),
    partition: event.payload.fetch(:partition)
  }

  duration = event.payload.fetch(:duration)
  @pause_duration.set(duration, labels: key)
end

#process_batch(event) ⇒ Object



119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
# File 'lib/kafka/prometheus.rb', line 119

def process_batch(event)
  key = {
    client: event.payload.fetch(:client_id),
    group_id: event.payload.fetch(:group_id),
    topic: event.payload.fetch(:topic),
    partition: event.payload.fetch(:partition)
  }
  message_count = event.payload.fetch(:message_count)

  if event.payload.key?(:exception)
    @process_batch_errors.increment(labels: key)
  else
    @process_batch_latency.observe(event.duration, labels: key)
    @process_messages.increment(by: message_count, labels: key)
  end
end

#process_message(event) ⇒ Object



91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
# File 'lib/kafka/prometheus.rb', line 91

def process_message(event)
  key = {
    client: event.payload.fetch(:client_id),
    group_id: event.payload.fetch(:group_id),
    topic: event.payload.fetch(:topic),
    partition: event.payload.fetch(:partition)
  }

  offset_lag = event.payload.fetch(:offset_lag)
  create_time = event.payload.fetch(:create_time)

  time_lag = create_time && ((Time.now - create_time) * 1000).to_i

  if event.payload.key?(:exception)
    @process_message_errors.increment(labels: key)
  else
    @process_message_latency.observe(event.duration, labels: key)
    @process_messages.increment(labels: key)
  end

  @offset_lag.set(offset_lag, labels: key)

  # Not all messages have timestamps.
  return unless time_lag

  @time_lag.set(time_lag, labels: key)
end

#sync_group(event) ⇒ Object



157
158
159
160
161
162
# File 'lib/kafka/prometheus.rb', line 157

def sync_group(event)
  key = { client: event.payload.fetch(:client_id), group_id: event.payload.fetch(:group_id) }
  @sync_group.observe(event.duration, labels: key)

  @sync_group_errors.increment(labels: key) if event.payload.key?(:exception)
end