Class: Karafka::Instrumentation::Vendors::Datadog::Listener
- Inherits:
-
Object
- Object
- Karafka::Instrumentation::Vendors::Datadog::Listener
- Extended by:
- Forwardable
- Includes:
- Core::Configurable
- Defined in:
- lib/karafka/instrumentation/vendors/datadog/listener.rb
Overview
You need to setup the ‘dogstatsd-ruby` client and assign it
Listener that can be used to subscribe to Karafka to receive stats via StatsD and/or Datadog
Defined Under Namespace
Classes: RdKafkaMetric
Instance Method Summary collapse
-
#initialize(&block) ⇒ Listener
constructor
A new instance of Listener.
-
#on_connection_listener_fetch_loop_received(event) ⇒ Object
Reports how many messages we’ve polled and how much time did we spend on it.
-
#on_consumer_consumed(event) ⇒ Object
Here we report majority of things related to processing as we have access to the consumer.
- #on_consumer_revoked(event) ⇒ Object
- #on_consumer_shutdown(event) ⇒ Object
-
#on_error_occurred(event) ⇒ Object
Increases the errors count by 1.
-
#on_statistics_emitted(event) ⇒ Object
Hooks up to WaterDrop instrumentation for emitted statistics.
-
#on_worker_process(event) ⇒ Object
Worker related metrics.
-
#on_worker_processed(event) ⇒ Object
We report this metric before and after processing for higher accuracy Without this, the utilization would not be fully reflected.
- #setup(&block) ⇒ Object
Constructor Details
#initialize(&block) ⇒ Listener
Returns a new instance of Listener.
59 60 61 62 |
# File 'lib/karafka/instrumentation/vendors/datadog/listener.rb', line 59 def initialize(&block) configure setup(&block) if block end |
Instance Method Details
#on_connection_listener_fetch_loop_received(event) ⇒ Object
Reports how many messages we’ve polled and how much time did we spend on it
100 101 102 103 104 105 106 107 108 109 110 |
# File 'lib/karafka/instrumentation/vendors/datadog/listener.rb', line 100 def on_connection_listener_fetch_loop_received(event) time_taken = event[:time] = event[:messages_buffer].size consumer_group_id = event[:subscription_group].consumer_group_id = ["consumer_group:#{consumer_group_id}"] histogram('listener.polling.time_taken', time_taken, tags: + ) histogram('listener.polling.messages', , tags: + ) end |
#on_consumer_consumed(event) ⇒ Object
Here we report majority of things related to processing as we have access to the consumer
115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 |
# File 'lib/karafka/instrumentation/vendors/datadog/listener.rb', line 115 def on_consumer_consumed(event) consumer = event.payload[:caller] = consumer. = . = + (consumer) count('consumer.messages', .count, tags: ) count('consumer.batches', 1, tags: ) gauge('consumer.offset', .last_offset, tags: ) histogram('consumer.consumed.time_taken', event[:time], tags: ) histogram('consumer.batch_size', .count, tags: ) histogram('consumer.processing_lag', .processing_lag, tags: ) histogram('consumer.consumption_lag', .consumption_lag, tags: ) end |
#on_consumer_revoked(event) ⇒ Object
132 133 134 135 136 |
# File 'lib/karafka/instrumentation/vendors/datadog/listener.rb', line 132 def on_consumer_revoked(event) = + (event.payload[:caller]) count('consumer.revoked', 1, tags: ) end |
#on_consumer_shutdown(event) ⇒ Object
139 140 141 142 143 |
# File 'lib/karafka/instrumentation/vendors/datadog/listener.rb', line 139 def on_consumer_shutdown(event) = + (event.payload[:caller]) count('consumer.shutdown', 1, tags: ) end |
#on_error_occurred(event) ⇒ Object
Increases the errors count by 1
87 88 89 90 91 92 93 94 95 |
# File 'lib/karafka/instrumentation/vendors/datadog/listener.rb', line 87 def on_error_occurred(event) = ["type:#{event[:type]}"] if event.payload[:caller].respond_to?(:messages) += (event.payload[:caller]) end count('error_occurred', 1, tags: + ) end |
#on_statistics_emitted(event) ⇒ Object
Hooks up to WaterDrop instrumentation for emitted statistics
73 74 75 76 77 78 79 80 81 82 |
# File 'lib/karafka/instrumentation/vendors/datadog/listener.rb', line 73 def on_statistics_emitted(event) statistics = event[:statistics] consumer_group_id = event[:consumer_group_id] = + ["consumer_group:#{consumer_group_id}"] rd_kafka_metrics.each do |metric| report_metric(metric, statistics, ) end end |
#on_worker_process(event) ⇒ Object
Worker related metrics
147 148 149 150 151 152 153 |
# File 'lib/karafka/instrumentation/vendors/datadog/listener.rb', line 147 def on_worker_process(event) jq_stats = event[:jobs_queue].statistics gauge('worker.total_threads', Karafka::App.config.concurrency, tags: ) histogram('worker.processing', jq_stats[:busy], tags: ) histogram('worker.enqueued_jobs', jq_stats[:enqueued], tags: ) end |
#on_worker_processed(event) ⇒ Object
We report this metric before and after processing for higher accuracy Without this, the utilization would not be fully reflected
158 159 160 161 162 |
# File 'lib/karafka/instrumentation/vendors/datadog/listener.rb', line 158 def on_worker_processed(event) jq_stats = event[:jobs_queue].statistics histogram('worker.processing', jq_stats[:busy], tags: ) end |
#setup(&block) ⇒ Object
We define this alias to be consistent with ‘WaterDrop#setup`
66 67 68 |
# File 'lib/karafka/instrumentation/vendors/datadog/listener.rb', line 66 def setup(&block) configure(&block) end |