Class: Sbmt::KafkaProducer::Instrumentation::YabedaMetricsListener
- Inherits:
-
Object
- Object
- Sbmt::KafkaProducer::Instrumentation::YabedaMetricsListener
- Defined in:
- lib/sbmt/kafka_producer/instrumentation/yabeda_metrics_listener.rb
Constant Summary collapse
- DEFAULT_CLIENT =
{client: "waterdrop"}.freeze
Instance Method Summary collapse
- #on_error_occurred(event) ⇒ Object
- #on_message_acknowledged(event) ⇒ Object
- #on_message_buffered(event) ⇒ Object
- #on_statistics_emitted(event) ⇒ Object
Instance Method Details
#on_error_occurred(event) ⇒ Object
16 17 18 19 20 21 22 23 24 25 26 27 |
# File 'lib/sbmt/kafka_producer/instrumentation/yabeda_metrics_listener.rb', line 16 def on_error_occurred(event) = {topic: event[:topic]}.merge!(DEFAULT_CLIENT) if event.payload.include?(:topic) case event[:type] when "message.produce_sync", "message.produce_async" Yabeda.kafka_producer.produce_errors .increment((event)) when "librdkafka.dispatch_error" Yabeda.kafka_producer.deliver_errors .increment() end end |
#on_message_acknowledged(event) ⇒ Object
47 48 49 50 51 52 |
# File 'lib/sbmt/kafka_producer/instrumentation/yabeda_metrics_listener.rb', line 47 def (event) tag = {topic: event[:topic]}.merge!(DEFAULT_CLIENT) Yabeda.kafka_producer. .increment(tag) end |
#on_message_buffered(event) ⇒ Object
42 43 44 45 |
# File 'lib/sbmt/kafka_producer/instrumentation/yabeda_metrics_listener.rb', line 42 def (event) Yabeda.kafka_producer.buffer_size .measure(DEFAULT_CLIENT, event[:buffer].size) end |
#on_statistics_emitted(event) ⇒ Object
8 9 10 11 12 13 14 |
# File 'lib/sbmt/kafka_producer/instrumentation/yabeda_metrics_listener.rb', line 8 def on_statistics_emitted(event) # https://github.com/confluentinc/librdkafka/blob/master/STATISTICS.md stats = event.payload[:statistics] broker_stats = stats["brokers"] report_broker_stats(broker_stats) end |