Class: Karafka::Instrumentation::Vendors::Datadog::MetricsListener
- Inherits:
-
Object
- Object
- Karafka::Instrumentation::Vendors::Datadog::MetricsListener
- Extended by:
- Forwardable
- Includes:
- Core::Configurable
- Defined in:
- lib/karafka/instrumentation/vendors/datadog/metrics_listener.rb
Overview
You need to setup the ‘dogstatsd-ruby` client and assign it
Listener that can be used to subscribe to Karafka to receive stats via StatsD and/or Datadog
Defined Under Namespace
Classes: RdKafkaMetric
Instance Method Summary collapse
-
#initialize(&block) ⇒ MetricsListener
constructor
A new instance of MetricsListener.
-
#on_connection_listener_fetch_loop_received(event) ⇒ Object
Reports how many messages we’ve polled and how much time did we spend on it.
-
#on_consumer_consumed(event) ⇒ Object
Here we report majority of things related to processing as we have access to the consumer.
-
#on_error_occurred(event) ⇒ Object
Increases the errors count by 1.
-
#on_statistics_emitted(event) ⇒ Object
Hooks up to Karafka instrumentation for emitted statistics.
-
#on_worker_process(event) ⇒ Object
Worker related metrics.
-
#on_worker_processed(event) ⇒ Object
We report this metric before and after processing for higher accuracy Without this, the utilization would not be fully reflected.
- #setup(&block) ⇒ Object
Constructor Details
#initialize(&block) ⇒ MetricsListener
Returns a new instance of MetricsListener.
67 68 69 70 |
# File 'lib/karafka/instrumentation/vendors/datadog/metrics_listener.rb', line 67 def initialize(&block) configure setup(&block) if block end |
Instance Method Details
#on_connection_listener_fetch_loop_received(event) ⇒ Object
Reports how many messages we’ve polled and how much time did we spend on it
110 111 112 113 114 115 116 117 118 119 120 121 |
# File 'lib/karafka/instrumentation/vendors/datadog/metrics_listener.rb', line 110 def on_connection_listener_fetch_loop_received(event) time_taken = event[:time] = event[:messages_buffer].size consumer_group_id = event[:subscription_group].consumer_group.id = ["consumer_group:#{consumer_group_id}"] .concat() histogram('listener.polling.time_taken', time_taken, tags: ) histogram('listener.polling.messages', , tags: ) end |
#on_consumer_consumed(event) ⇒ Object
Here we report majority of things related to processing as we have access to the consumer
126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 |
# File 'lib/karafka/instrumentation/vendors/datadog/metrics_listener.rb', line 126 def on_consumer_consumed(event) consumer = event.payload[:caller] = consumer. = . = (consumer) .concat() count('consumer.messages', .count, tags: ) count('consumer.batches', 1, tags: ) gauge('consumer.offset', .last_offset, tags: ) histogram('consumer.consumed.time_taken', event[:time], tags: ) histogram('consumer.batch_size', .count, tags: ) histogram('consumer.processing_lag', .processing_lag, tags: ) histogram('consumer.consumption_lag', .consumption_lag, tags: ) end |
#on_error_occurred(event) ⇒ Object
Increases the errors count by 1
96 97 98 99 100 101 102 103 104 105 |
# File 'lib/karafka/instrumentation/vendors/datadog/metrics_listener.rb', line 96 def on_error_occurred(event) = ["type:#{event[:type]}"] .concat() if event.payload[:caller].respond_to?(:messages) .concat((event.payload[:caller])) end count('error_occurred', 1, tags: ) end |
#on_statistics_emitted(event) ⇒ Object
Hooks up to Karafka instrumentation for emitted statistics
81 82 83 84 85 86 87 88 89 90 91 |
# File 'lib/karafka/instrumentation/vendors/datadog/metrics_listener.rb', line 81 def on_statistics_emitted(event) statistics = event[:statistics] consumer_group_id = event[:consumer_group_id] = ["consumer_group:#{consumer_group_id}"] .concat() rd_kafka_metrics.each do |metric| report_metric(metric, statistics, ) end end |
#on_worker_process(event) ⇒ Object
Worker related metrics
163 164 165 166 167 168 169 170 |
# File 'lib/karafka/instrumentation/vendors/datadog/metrics_listener.rb', line 163 def on_worker_process(event) jq_stats = event[:jobs_queue].statistics = gauge('worker.total_threads', Karafka::App.config.concurrency, tags: ) histogram('worker.processing', jq_stats[:busy], tags: ) histogram('worker.enqueued_jobs', jq_stats[:enqueued], tags: ) end |
#on_worker_processed(event) ⇒ Object
We report this metric before and after processing for higher accuracy Without this, the utilization would not be fully reflected
175 176 177 178 179 |
# File 'lib/karafka/instrumentation/vendors/datadog/metrics_listener.rb', line 175 def on_worker_processed(event) jq_stats = event[:jobs_queue].statistics histogram('worker.processing', jq_stats[:busy], tags: ) end |
#setup(&block) ⇒ Object
We define this alias to be consistent with ‘WaterDrop#setup`
74 75 76 |
# File 'lib/karafka/instrumentation/vendors/datadog/metrics_listener.rb', line 74 def setup(&block) configure(&block) end |