Class: Karafka::Instrumentation::Vendors::Appsignal::MetricsListener
- Defined in:
- lib/karafka/instrumentation/vendors/appsignal/metrics_listener.rb
Overview
Listener that ships metrics to Appsignal
Defined Under Namespace
Classes: RdKafkaMetric
Instance Method Summary collapse
-
#count(key, value, tags) ⇒ Object
Increments a counter with a namespace key, value and tags.
-
#gauge(key, value, tags) ⇒ Object
Sets the gauge value.
-
#on_app_running(_event) ⇒ Object
Register minute based probe only on app running.
-
#on_consumer_consume(event) ⇒ Object
Before each consumption process, lets start a transaction associated with it We also set some basic metadata about the given consumption that can be useful for debugging.
-
#on_consumer_consumed(event) ⇒ Object
Once we’re done with consumption, we bump counters about that.
-
#on_dead_letter_queue_dispatched(event) ⇒ Object
Counts DLQ dispatches.
-
#on_error_occurred(event) ⇒ Object
Reports on any error that occurs.
-
#on_statistics_emitted(event) ⇒ Object
Hooks up to Karafka instrumentation for emitted statistics.
-
#report_aggregated_topics_metrics(statistics, consumer_group_id) ⇒ Object
Publishes aggregated topic-level metrics that are sum of per partition metrics.
-
#report_metric(metric, statistics, consumer_group_id) ⇒ Object
Reports a given metric statistics to Appsignal.
Methods inherited from Base
Constructor Details
This class inherits a constructor from Karafka::Instrumentation::Vendors::Appsignal::Base
Instance Method Details
#count(key, value, tags) ⇒ Object
Increments a counter with a namespace key, value and tags
263 264 265 266 267 268 269 |
# File 'lib/karafka/instrumentation/vendors/appsignal/metrics_listener.rb', line 263 def count(key, value, ) client.count( namespaced_metric(key), value, ) end |
#gauge(key, value, tags) ⇒ Object
Sets the gauge value
276 277 278 279 280 281 282 |
# File 'lib/karafka/instrumentation/vendors/appsignal/metrics_listener.rb', line 276 def gauge(key, value, ) client.gauge( namespaced_metric(key), value, ) end |
#on_app_running(_event) ⇒ Object
Register minute based probe only on app running. Otherwise if we would always register minute probe, it would report on processes using Karafka but not running the consumption process
98 99 100 101 102 103 104 105 |
# File 'lib/karafka/instrumentation/vendors/appsignal/metrics_listener.rb', line 98 def on_app_running(_event) return if @probe_registered @probe_registered = true # Registers the minutely probe for one-every-minute metrics client.register_probe(:karafka, -> { minute_probe }) end |
#on_consumer_consume(event) ⇒ Object
Before each consumption process, lets start a transaction associated with it We also set some basic metadata about the given consumption that can be useful for debugging
60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 |
# File 'lib/karafka/instrumentation/vendors/appsignal/metrics_listener.rb', line 60 def on_consumer_consume(event) consumer = event.payload[:caller] start_transaction(consumer, 'consume') client. = { batch_size: consumer..size, first_offset: consumer...first_offset, last_offset: consumer...last_offset, consumer_group: consumer.topic.consumer_group.id, topic: consumer.topic.name, partition: consumer.partition, attempt: consumer.coordinator.pause_tracker.attempt } end |
#on_consumer_consumed(event) ⇒ Object
Once we’re done with consumption, we bump counters about that
79 80 81 82 83 84 85 86 87 88 89 90 91 |
# File 'lib/karafka/instrumentation/vendors/appsignal/metrics_listener.rb', line 79 def on_consumer_consumed(event) consumer = event.payload[:caller] = consumer. = . with_multiple_resolutions(consumer) do || count('consumer_messages', .size, ) count('consumer_batches', 1, ) gauge('consumer_offsets', .last_offset, ) end stop_transaction end |
#on_dead_letter_queue_dispatched(event) ⇒ Object
Counts DLQ dispatches
133 134 135 136 137 138 139 |
# File 'lib/karafka/instrumentation/vendors/appsignal/metrics_listener.rb', line 133 def on_dead_letter_queue_dispatched(event) consumer = event.payload[:caller] with_multiple_resolutions(consumer) do || count('consumer_dead', 1, ) end end |
#on_error_occurred(event) ⇒ Object
Reports on any error that occurs. This also includes non-user related errors originating from the framework.
145 146 147 148 149 150 151 152 153 154 155 156 |
# File 'lib/karafka/instrumentation/vendors/appsignal/metrics_listener.rb', line 145 def on_error_occurred(event) # If this is a user consumption related error, we bump the counters for metrics if USER_CONSUMER_ERROR_TYPES.include?(event[:type]) consumer = event.payload[:caller] with_multiple_resolutions(consumer) do || count('consumer_errors', 1, ) end end stop_transaction end |
#on_statistics_emitted(event) ⇒ Object
Hooks up to Karafka instrumentation for emitted statistics
161 162 163 164 165 166 167 168 169 170 |
# File 'lib/karafka/instrumentation/vendors/appsignal/metrics_listener.rb', line 161 def on_statistics_emitted(event) statistics = event[:statistics] consumer_group_id = event[:consumer_group_id] rd_kafka_metrics.each do |metric| report_metric(metric, statistics, consumer_group_id) end report_aggregated_topics_metrics(statistics, consumer_group_id) end |
#report_aggregated_topics_metrics(statistics, consumer_group_id) ⇒ Object
Publishes aggregated topic-level metrics that are sum of per partition metrics
231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 |
# File 'lib/karafka/instrumentation/vendors/appsignal/metrics_listener.rb', line 231 def report_aggregated_topics_metrics(statistics, consumer_group_id) config.aggregated_rd_kafka_metrics.each do |metric| statistics.fetch('topics').each do |topic_name, topic_values| sum = 0 topic_values['partitions'].each do |partition_name, partition_statistics| next if partition_name == '-1' # Skip until lag info is available next if partition_statistics['consumer_lag'] == -1 next if partition_statistics['consumer_lag_stored'] == -1 sum += partition_statistics.dig(*metric.key_location) end public_send( metric.type, metric.name, sum, { consumer_group: consumer_group_id, topic: topic_name } ) end end end |
#report_metric(metric, statistics, consumer_group_id) ⇒ Object
Reports a given metric statistics to Appsignal
176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 |
# File 'lib/karafka/instrumentation/vendors/appsignal/metrics_listener.rb', line 176 def report_metric(metric, statistics, consumer_group_id) case metric.scope when :root # Do nothing on the root metrics as the same metrics are reported in a granular # way from other places nil when :brokers statistics.fetch('brokers').each_value do |broker_statistics| # Skip bootstrap nodes # Bootstrap nodes have nodeid -1, other nodes have positive # node ids next if broker_statistics['nodeid'] == -1 public_send( metric.type, metric.name, broker_statistics.dig(*metric.key_location), { broker: broker_statistics['nodename'] } ) end when :topics statistics.fetch('topics').each do |topic_name, topic_values| topic_values['partitions'].each do |partition_name, partition_statistics| next if partition_name == '-1' # Skip until lag info is available next if partition_statistics['consumer_lag'] == -1 next if partition_statistics['consumer_lag_stored'] == -1 # Skip if we do not own the fetch assignment next if partition_statistics['fetch_state'] == 'stopped' next if partition_statistics['fetch_state'] == 'none' public_send( metric.type, metric.name, partition_statistics.dig(*metric.key_location), { consumer_group: consumer_group_id, topic: topic_name, partition: partition_name } ) end end else raise ArgumentError, metric.scope end end |