Class: Kafka::Prometheus::AsyncProducerSubscriber
- Inherits:
-
ActiveSupport::Subscriber
- Object
- ActiveSupport::Subscriber
- Kafka::Prometheus::AsyncProducerSubscriber
- Defined in:
- lib/kafka/prometheus.rb
Instance Method Summary collapse
- #buffer_overflow(event) ⇒ Object
- #drop_messages(event) ⇒ Object
- #enqueue_message(event) ⇒ Object
-
#initialize ⇒ AsyncProducerSubscriber
constructor
A new instance of AsyncProducerSubscriber.
Constructor Details
#initialize ⇒ AsyncProducerSubscriber
Returns a new instance of AsyncProducerSubscriber.
264 265 266 267 268 269 270 |
# File 'lib/kafka/prometheus.rb', line 264 def initialize super @queue_size = Prometheus.registry.histogram(:async_producer_queue_size, docstring: 'Queue size', buckets: SIZE_BUCKETS, labels: [:client, :topic]) @queue_fill_ratio = Prometheus.registry.histogram(:async_producer_queue_fill_ratio, docstring: 'Queue fill ratio', labels: [:client, :topic]) @produce_errors = Prometheus.registry.counter(:async_producer_produce_errors, docstring: 'Producer errors', labels: [:client, :topic]) @dropped_messages = Prometheus.registry.counter(:async_producer_dropped_messages, docstring: 'Dropped messages', labels: [:client]) end |
Instance Method Details
#buffer_overflow(event) ⇒ Object
286 287 288 289 |
# File 'lib/kafka/prometheus.rb', line 286 def buffer_overflow(event) key = { client: event.payload.fetch(:client_id), topic: event.payload.fetch(:topic) } @produce_errors.increment(labels: key) end |
#drop_messages(event) ⇒ Object
291 292 293 294 295 |
# File 'lib/kafka/prometheus.rb', line 291 def (event) key = { client: event.payload.fetch(:client_id) } = event.payload.fetch(:message_count) @dropped_messages.increment(by: , labels: key) end |
#enqueue_message(event) ⇒ Object
272 273 274 275 276 277 278 279 280 281 282 283 284 |
# File 'lib/kafka/prometheus.rb', line 272 def (event) key = { client: event.payload.fetch(:client_id), topic: event.payload.fetch(:topic) } queue_size = event.payload.fetch(:queue_size) max_queue_size = event.payload.fetch(:max_queue_size) queue_fill_ratio = queue_size.to_f / max_queue_size.to_f # This gets us the avg/max queue size per producer. @queue_size.observe(queue_size, labels: key) # This gets us the avg/max queue fill ratio per producer. @queue_fill_ratio.observe(queue_fill_ratio, labels: key) end |