Class: Karafka::Instrumentation::Vendors::Datadog::MetricsListener
- Inherits:
-
Object
- Object
- Karafka::Instrumentation::Vendors::Datadog::MetricsListener
- Extended by:
- Forwardable
- Includes:
- Core::Configurable
- Defined in:
- lib/karafka/instrumentation/vendors/datadog/metrics_listener.rb
Overview
You need to setup the ‘dogstatsd-ruby` client and assign it
Listener that can be used to subscribe to Karafka to receive stats via StatsD and/or Datadog
Defined Under Namespace
Classes: RdKafkaMetric
Instance Method Summary collapse
-
#initialize(&block) ⇒ MetricsListener
constructor
A new instance of MetricsListener.
-
#on_connection_listener_fetch_loop_received(event) ⇒ Object
Reports how many messages we’ve polled and how much time did we spend on it.
-
#on_consumer_consumed(event) ⇒ Object
Here we report majority of things related to processing as we have access to the consumer.
-
#on_error_occurred(event) ⇒ Object
Increases the errors count by 1.
-
#on_statistics_emitted(event) ⇒ Object
Hooks up to Karafka instrumentation for emitted statistics.
-
#on_worker_process(event) ⇒ Object
Worker related metrics.
-
#on_worker_processed(event) ⇒ Object
We report this metric before and after processing for higher accuracy Without this, the utilization would not be fully reflected.
- #setup(&block) ⇒ Object
Constructor Details
#initialize(&block) ⇒ MetricsListener
Returns a new instance of MetricsListener.
67 68 69 70 |
# File 'lib/karafka/instrumentation/vendors/datadog/metrics_listener.rb', line 67 def initialize(&block) configure setup(&block) if block end |
Instance Method Details
#on_connection_listener_fetch_loop_received(event) ⇒ Object
Reports how many messages we’ve polled and how much time did we spend on it
108 109 110 111 112 113 114 115 116 117 118 |
# File 'lib/karafka/instrumentation/vendors/datadog/metrics_listener.rb', line 108 def on_connection_listener_fetch_loop_received(event) time_taken = event[:time] = event[:messages_buffer].size consumer_group_id = event[:subscription_group].consumer_group.id = ["consumer_group:#{consumer_group_id}"] histogram('listener.polling.time_taken', time_taken, tags: + ) histogram('listener.polling.messages', , tags: + ) end |
#on_consumer_consumed(event) ⇒ Object
Here we report majority of things related to processing as we have access to the consumer
123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 |
# File 'lib/karafka/instrumentation/vendors/datadog/metrics_listener.rb', line 123 def on_consumer_consumed(event) consumer = event.payload[:caller] = consumer. = . = + (consumer) count('consumer.messages', .count, tags: ) count('consumer.batches', 1, tags: ) gauge('consumer.offset', .last_offset, tags: ) histogram('consumer.consumed.time_taken', event[:time], tags: ) histogram('consumer.batch_size', .count, tags: ) histogram('consumer.processing_lag', .processing_lag, tags: ) histogram('consumer.consumption_lag', .consumption_lag, tags: ) end |
#on_error_occurred(event) ⇒ Object
Increases the errors count by 1
95 96 97 98 99 100 101 102 103 |
# File 'lib/karafka/instrumentation/vendors/datadog/metrics_listener.rb', line 95 def on_error_occurred(event) = ["type:#{event[:type]}"] if event.payload[:caller].respond_to?(:messages) += (event.payload[:caller]) end count('error_occurred', 1, tags: + ) end |
#on_statistics_emitted(event) ⇒ Object
Hooks up to Karafka instrumentation for emitted statistics
81 82 83 84 85 86 87 88 89 90 |
# File 'lib/karafka/instrumentation/vendors/datadog/metrics_listener.rb', line 81 def on_statistics_emitted(event) statistics = event[:statistics] consumer_group_id = event[:consumer_group_id] = + ["consumer_group:#{consumer_group_id}"] rd_kafka_metrics.each do |metric| report_metric(metric, statistics, ) end end |
#on_worker_process(event) ⇒ Object
Worker related metrics
158 159 160 161 162 163 164 |
# File 'lib/karafka/instrumentation/vendors/datadog/metrics_listener.rb', line 158 def on_worker_process(event) jq_stats = event[:jobs_queue].statistics gauge('worker.total_threads', Karafka::App.config.concurrency, tags: ) histogram('worker.processing', jq_stats[:busy], tags: ) histogram('worker.enqueued_jobs', jq_stats[:enqueued], tags: ) end |
#on_worker_processed(event) ⇒ Object
We report this metric before and after processing for higher accuracy Without this, the utilization would not be fully reflected
169 170 171 172 173 |
# File 'lib/karafka/instrumentation/vendors/datadog/metrics_listener.rb', line 169 def on_worker_processed(event) jq_stats = event[:jobs_queue].statistics histogram('worker.processing', jq_stats[:busy], tags: ) end |
#setup(&block) ⇒ Object
We define this alias to be consistent with ‘WaterDrop#setup`
74 75 76 |
# File 'lib/karafka/instrumentation/vendors/datadog/metrics_listener.rb', line 74 def setup(&block) configure(&block) end |