Class: Honeybadger::Karafka::InsightsListener
- Inherits:
-
Object
- Object
- Honeybadger::Karafka::InsightsListener
- Includes:
- InstrumentationHelper
- Defined in:
- lib/honeybadger/karafka.rb
Defined Under Namespace
Classes: RdKafkaMetric
Constant Summary collapse
- RD_KAFKA_METRICS =
All the rdkafka metrics we want to publish
By default we publish quite a lot so this can be tuned Note, that the once with ‘_d` come from Karafka, not rdkafka or Kafka
RdKafkaMetric.new(:increment_counter, :root, 'messages_consumed', 'rxmsgs_d'), RdKafkaMetric.new(:increment_counter, :root, 'messages_consumed_bytes', 'rxmsg_bytes'), # Broker metrics RdKafkaMetric.new(:increment_counter, :brokers, 'consume_attempts', 'txretries_d'), RdKafkaMetric.new(:increment_counter, :brokers, 'consume_errors', 'txerrs_d'), RdKafkaMetric.new(:increment_counter, :brokers, 'receive_errors', 'rxerrs_d'), RdKafkaMetric.new(:increment_counter, :brokers, 'connection_connects', 'connects_d'), RdKafkaMetric.new(:increment_counter, :brokers, 'connection_disconnects', 'disconnects_d'), RdKafkaMetric.new(:gauge, :brokers, 'network_latency_avg', %w[rtt avg]), RdKafkaMetric.new(:gauge, :brokers, 'network_latency_p95', %w[rtt p95]), RdKafkaMetric.new(:gauge, :brokers, 'network_latency_p99', %w[rtt p99]), # Topics metrics RdKafkaMetric.new(:gauge, :topics, 'consumer_lags', 'consumer_lag_stored'), RdKafkaMetric.new(:gauge, :topics, 'consumer_lags_delta', 'consumer_lag_stored_d') ].freeze
- AGGREGATED_RD_KAFKA_METRICS =
Metrics that sum values on topics levels and not on partition levels
[ # Topic aggregated metrics RdKafkaMetric.new(:gauge, :topics, 'consumer_aggregated_lag', 'consumer_lag_stored') ].freeze
Instance Method Summary collapse
-
#initialize ⇒ InsightsListener
constructor
A new instance of InsightsListener.
-
#on_connection_listener_fetch_loop_received(event) ⇒ Object
Reports how many messages we’ve polled and how much time did we spend on it.
-
#on_consumer_consumed(event) ⇒ Object
Here we report majority of things related to processing as we have access to the consumer.
-
#on_error_occurred(event) ⇒ Object
Increases the errors count by 1.
-
#on_statistics_emitted(event) ⇒ Object
Hooks up to Karafka instrumentation for emitted statistics.
-
#on_worker_process(event) ⇒ Object
Worker related metrics.
-
#on_worker_processed(event) ⇒ Object
We report this metric before and after processing for higher accuracy Without this, the utilization would not be fully reflected.
-
#report_aggregated_topics_metrics(statistics, consumer_group_id) ⇒ Object
Publishes aggregated topic-level metrics that are sum of per partition metrics.
Methods included from InstrumentationHelper
#decrement_counter, #extract_attributes, #extract_callable, #gauge, #histogram, #increment_counter, #metric_agent, #metric_attributes, #metric_instrumentation, #metric_source, #monotonic_timer, #time
Constructor Details
#initialize ⇒ InsightsListener
Returns a new instance of InsightsListener.
65 66 67 |
# File 'lib/honeybadger/karafka.rb', line 65 def initialize metric_source("karafka") end |
Instance Method Details
#on_connection_listener_fetch_loop_received(event) ⇒ Object
Reports how many messages we’ve polled and how much time did we spend on it
142 143 144 145 146 147 148 149 150 151 152 |
# File 'lib/honeybadger/karafka.rb', line 142 def on_connection_listener_fetch_loop_received(event) time_taken = event[:time] = event[:messages_buffer].size consumer_group_id = event[:subscription_group].consumer_group.id = { consumer_group: consumer_group_id } if Honeybadger.config.load_plugin_insights_metrics?(:karafka) histogram('listener_polling_time_taken', value: time_taken, **) histogram('listener_polling_messages', value: , **) end end |
#on_consumer_consumed(event) ⇒ Object
Here we report majority of things related to processing as we have access to the consumer
157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 |
# File 'lib/honeybadger/karafka.rb', line 157 def on_consumer_consumed(event) consumer = event.payload[:caller] = consumer. = . = (consumer) if Honeybadger.config.load_plugin_insights_events?(:karafka) event_context = .merge({ consumer: consumer.class.name, duration: event[:time], processing_lag: .processing_lag, consumption_lag: .consumption_lag, processed: .count }) Honeybadger.event("consumer.consumed.karafka", event_context) end if Honeybadger.config.load_plugin_insights_metrics?(:karafka) increment_counter('consumer_messages', value: .count, **) increment_counter('consumer_batches', value: 1, **) gauge('consumer_offset', value: .last_offset, **) histogram('consumer_consumed_time_taken', value: event[:time], **) histogram('consumer_batch_size', value: .count, **) histogram('consumer_processing_lag', value: .processing_lag, **) histogram('consumer_consumption_lag', value: .consumption_lag, **) end end |
#on_error_occurred(event) ⇒ Object
Increases the errors count by 1
123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 |
# File 'lib/honeybadger/karafka.rb', line 123 def on_error_occurred(event) = { type: event[:type] } if event.payload[:caller].respond_to?(:messages) .merge!((event.payload[:caller])) end if Honeybadger.config.load_plugin_insights_events?(:karafka) Honeybadger.event("error.occurred.karafka", error: event[:error], **) end if Honeybadger.config.load_plugin_insights_metrics?(:karafka) increment_counter('error_occurred', value: 1, **) end end |
#on_statistics_emitted(event) ⇒ Object
Hooks up to Karafka instrumentation for emitted statistics
72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 |
# File 'lib/honeybadger/karafka.rb', line 72 def on_statistics_emitted(event) if Honeybadger.config.load_plugin_insights_events?(:karafka) Honeybadger.event("statistics_emitted.karafka", event.payload) end return unless Honeybadger.config.load_plugin_insights_metrics?(:karafka) statistics = event[:statistics] consumer_group_id = event[:consumer_group_id] = { consumer_group: consumer_group_id } RD_KAFKA_METRICS.each do |metric| report_metric(metric, statistics, ) end report_aggregated_topics_metrics(statistics, consumer_group_id) end |
#on_worker_process(event) ⇒ Object
Worker related metrics
206 207 208 209 210 211 212 213 214 |
# File 'lib/honeybadger/karafka.rb', line 206 def on_worker_process(event) jq_stats = event[:jobs_queue].statistics if Honeybadger.config.load_plugin_insights_metrics?(:karafka) gauge('worker_total_threads', value: ::Karafka::App.config.concurrency) histogram('worker_processing', value: jq_stats[:busy]) histogram('worker_enqueued_jobs', value: jq_stats[:enqueued]) end end |
#on_worker_processed(event) ⇒ Object
We report this metric before and after processing for higher accuracy Without this, the utilization would not be fully reflected
219 220 221 222 223 224 225 |
# File 'lib/honeybadger/karafka.rb', line 219 def on_worker_processed(event) jq_stats = event[:jobs_queue].statistics if Honeybadger.config.load_plugin_insights_metrics?(:karafka) histogram('worker_processing', value: jq_stats[:busy]) end end |
#report_aggregated_topics_metrics(statistics, consumer_group_id) ⇒ Object
Publishes aggregated topic-level metrics that are sum of per partition metrics
95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 |
# File 'lib/honeybadger/karafka.rb', line 95 def report_aggregated_topics_metrics(statistics, consumer_group_id) AGGREGATED_RD_KAFKA_METRICS.each do |metric| statistics.fetch('topics').each do |topic_name, topic_values| sum = 0 topic_values['partitions'].each do |partition_name, partition_statistics| next if partition_name == '-1' # Skip until lag info is available next if partition_statistics['consumer_lag'] == -1 next if partition_statistics['consumer_lag_stored'] == -1 sum += partition_statistics.dig(*metric.key_location) end public_send( metric.type, metric.name, value: sum, consumer_group: consumer_group_id, topic: topic_name ) end end end |