Class: Honeybadger::Karafka::InsightsListener

Inherits:
Object
  • Object
show all
Includes:
InstrumentationHelper
Defined in:
lib/honeybadger/karafka.rb

Defined Under Namespace

Classes: RdKafkaMetric

Constant Summary collapse

RD_KAFKA_METRICS =

All the rdkafka metrics we want to publish

By default we publish quite a lot so this can be tuned Note, that the once with ‘_d` come from Karafka, not rdkafka or Kafka

RdKafkaMetric.new(:increment_counter, :root, 'messages_consumed', 'rxmsgs_d'),
  RdKafkaMetric.new(:increment_counter, :root, 'messages_consumed_bytes', 'rxmsg_bytes'),

  # Broker metrics
  RdKafkaMetric.new(:increment_counter, :brokers, 'consume_attempts', 'txretries_d'),
  RdKafkaMetric.new(:increment_counter, :brokers, 'consume_errors', 'txerrs_d'),
  RdKafkaMetric.new(:increment_counter, :brokers, 'receive_errors', 'rxerrs_d'),
  RdKafkaMetric.new(:increment_counter, :brokers, 'connection_connects', 'connects_d'),
  RdKafkaMetric.new(:increment_counter, :brokers, 'connection_disconnects', 'disconnects_d'),
  RdKafkaMetric.new(:gauge, :brokers, 'network_latency_avg', %w[rtt avg]),
  RdKafkaMetric.new(:gauge, :brokers, 'network_latency_p95', %w[rtt p95]),
  RdKafkaMetric.new(:gauge, :brokers, 'network_latency_p99', %w[rtt p99]),

  # Topics metrics
  RdKafkaMetric.new(:gauge, :topics, 'consumer_lags', 'consumer_lag_stored'),
  RdKafkaMetric.new(:gauge, :topics, 'consumer_lags_delta', 'consumer_lag_stored_d')
].freeze
AGGREGATED_RD_KAFKA_METRICS =

Metrics that sum values on topics levels and not on partition levels

[
  # Topic aggregated metrics
  RdKafkaMetric.new(:gauge, :topics, 'consumer_aggregated_lag', 'consumer_lag_stored')
].freeze

Instance Method Summary collapse

Methods included from InstrumentationHelper

#decrement_counter, #extract_attributes, #extract_callable, #gauge, #histogram, #increment_counter, #metric_agent, #metric_attributes, #metric_instrumentation, #metric_source, #monotonic_timer, #time

Constructor Details

#initializeInsightsListener

Returns a new instance of InsightsListener.



65
66
67
# File 'lib/honeybadger/karafka.rb', line 65

def initialize
  metric_source("karafka")
end

Instance Method Details

#on_connection_listener_fetch_loop_received(event) ⇒ Object

Reports how many messages we’ve polled and how much time did we spend on it

Parameters:

  • event (Karafka::Core::Monitoring::Event)


142
143
144
145
146
147
148
149
150
151
152
# File 'lib/honeybadger/karafka.rb', line 142

def on_connection_listener_fetch_loop_received(event)
  time_taken = event[:time]
  messages_count = event[:messages_buffer].size
  consumer_group_id = event[:subscription_group].consumer_group.id
  extra_tags = { consumer_group: consumer_group_id }

  if Honeybadger.config.load_plugin_insights_metrics?(:karafka)
    histogram('listener_polling_time_taken', value: time_taken, **extra_tags)
    histogram('listener_polling_messages', value: messages_count, **extra_tags)
  end
end

#on_consumer_consumed(event) ⇒ Object

Here we report majority of things related to processing as we have access to the consumer

Parameters:

  • event (Karafka::Core::Monitoring::Event)


157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
# File 'lib/honeybadger/karafka.rb', line 157

def on_consumer_consumed(event)
  consumer = event.payload[:caller]
  messages = consumer.messages
   = messages.

  tags = consumer_tags(consumer)

  if Honeybadger.config.load_plugin_insights_events?(:karafka)
    event_context = tags.merge({
      consumer: consumer.class.name,
      duration: event[:time],
      processing_lag: .processing_lag,
      consumption_lag: .consumption_lag,
      processed: messages.count
    })
    Honeybadger.event("consumer.consumed.karafka", event_context)
  end

  if Honeybadger.config.load_plugin_insights_metrics?(:karafka)
    increment_counter('consumer_messages', value: messages.count, **tags)
    increment_counter('consumer_batches', value: 1, **tags)
    gauge('consumer_offset', value: .last_offset, **tags)
    histogram('consumer_consumed_time_taken', value: event[:time], **tags)
    histogram('consumer_batch_size', value: messages.count, **tags)
    histogram('consumer_processing_lag', value: .processing_lag, **tags)
    histogram('consumer_consumption_lag', value: .consumption_lag, **tags)
  end
end

#on_error_occurred(event) ⇒ Object

Increases the errors count by 1

Parameters:

  • event (Karafka::Core::Monitoring::Event)


123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
# File 'lib/honeybadger/karafka.rb', line 123

def on_error_occurred(event)
  extra_tags = { type: event[:type] }

  if event.payload[:caller].respond_to?(:messages)
    extra_tags.merge!(consumer_tags(event.payload[:caller]))
  end

  if Honeybadger.config.load_plugin_insights_events?(:karafka)
    Honeybadger.event("error.occurred.karafka", error: event[:error], **extra_tags)
  end

  if Honeybadger.config.load_plugin_insights_metrics?(:karafka)
    increment_counter('error_occurred', value: 1, **extra_tags)
  end
end

#on_statistics_emitted(event) ⇒ Object

Hooks up to Karafka instrumentation for emitted statistics

Parameters:

  • event (Karafka::Core::Monitoring::Event)


72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
# File 'lib/honeybadger/karafka.rb', line 72

def on_statistics_emitted(event)
  if Honeybadger.config.load_plugin_insights_events?(:karafka)
    Honeybadger.event("statistics_emitted.karafka", event.payload)
  end

  return unless Honeybadger.config.load_plugin_insights_metrics?(:karafka)

  statistics = event[:statistics]
  consumer_group_id = event[:consumer_group_id]

  base_tags = { consumer_group: consumer_group_id }

  RD_KAFKA_METRICS.each do |metric|
    report_metric(metric, statistics, base_tags)
  end

  report_aggregated_topics_metrics(statistics, consumer_group_id)
end

#on_worker_process(event) ⇒ Object

Worker related metrics

Parameters:

  • event (Karafka::Core::Monitoring::Event)


206
207
208
209
210
211
212
213
214
# File 'lib/honeybadger/karafka.rb', line 206

def on_worker_process(event)
  jq_stats = event[:jobs_queue].statistics

  if Honeybadger.config.load_plugin_insights_metrics?(:karafka)
    gauge('worker_total_threads', value: ::Karafka::App.config.concurrency)
    histogram('worker_processing', value: jq_stats[:busy])
    histogram('worker_enqueued_jobs', value: jq_stats[:enqueued])
  end
end

#on_worker_processed(event) ⇒ Object

We report this metric before and after processing for higher accuracy Without this, the utilization would not be fully reflected

Parameters:

  • event (Karafka::Core::Monitoring::Event)


219
220
221
222
223
224
225
# File 'lib/honeybadger/karafka.rb', line 219

def on_worker_processed(event)
  jq_stats = event[:jobs_queue].statistics

  if Honeybadger.config.load_plugin_insights_metrics?(:karafka)
    histogram('worker_processing', value: jq_stats[:busy])
  end
end

#report_aggregated_topics_metrics(statistics, consumer_group_id) ⇒ Object

Publishes aggregated topic-level metrics that are sum of per partition metrics

Parameters:

  • statistics (Hash)

    hash with all the statistics emitted

  • consumer_group_id (String)

    cg in context which we operate



95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
# File 'lib/honeybadger/karafka.rb', line 95

def report_aggregated_topics_metrics(statistics, consumer_group_id)
  AGGREGATED_RD_KAFKA_METRICS.each do |metric|
    statistics.fetch('topics').each do |topic_name, topic_values|
      sum = 0

      topic_values['partitions'].each do |partition_name, partition_statistics|
        next if partition_name == '-1'
        # Skip until lag info is available
        next if partition_statistics['consumer_lag'] == -1
        next if partition_statistics['consumer_lag_stored'] == -1

        sum += partition_statistics.dig(*metric.key_location)
      end

      public_send(
        metric.type,
        metric.name,
        value: sum,
        consumer_group: consumer_group_id,
        topic: topic_name
      )
    end
  end
end