Class: Promenade::Kafka::ProducerSubscriber
- Inherits:
-
Subscriber
- Object
- ActiveSupport::Subscriber
- Subscriber
- Promenade::Kafka::ProducerSubscriber
- Defined in:
- lib/promenade/kafka/producer_subscriber.rb
Instance Method Summary collapse
- #ack_message(event) ⇒ Object
- #buffer_overflow(event) ⇒ Object
-
#deliver_messages(event) ⇒ Object
rubocop:disable Metrics/AbcSize.
-
#produce_message(event) ⇒ Object
rubocop:disable Metrics/AbcSize.
- #topic_error(event) ⇒ Object
Instance Method Details
#ack_message(event) ⇒ Object
93 94 95 96 97 98 99 |
# File 'lib/promenade/kafka/producer_subscriber.rb', line 93 def (event) labels = get_labels(event) delay = event.payload.fetch(:delay) Promenade.metric(:kafka_producer_ack_messages).increment(labels) Promenade.metric(:kafka_producer_ack_latency).observe(labels, delay) end |
#buffer_overflow(event) ⇒ Object
78 79 80 |
# File 'lib/promenade/kafka/producer_subscriber.rb', line 78 def buffer_overflow(event) Promenade.metric(:kafka_producer_buffer_overflows).increment(get_labels(event)) end |
#deliver_messages(event) ⇒ Object
rubocop:disable Metrics/AbcSize
82 83 84 85 86 87 88 89 90 91 |
# File 'lib/promenade/kafka/producer_subscriber.rb', line 82 def (event) # rubocop:disable Metrics/AbcSize labels = { client: event.payload.fetch(:client_id) } = event.payload.fetch(:delivered_message_count) attempts = event.payload.fetch(:attempts) Promenade.metric(:kafka_producer_delivery_errors).increment(labels) if event.payload.key?(:exception) Promenade.metric(:kafka_producer_delivery_latency).observe(labels, event.duration) Promenade.metric(:kafka_producer_delivered_messages).increment(labels, ) Promenade.metric(:kafka_producer_delivery_attempts).observe(labels, attempts) end |
#produce_message(event) ⇒ Object
rubocop:disable Metrics/AbcSize
64 65 66 67 68 69 70 71 72 73 74 75 76 |
# File 'lib/promenade/kafka/producer_subscriber.rb', line 64 def (event) # rubocop:disable Metrics/AbcSize labels = get_labels(event) = event.payload.fetch(:message_size) buffer_size = event.payload.fetch(:buffer_size) max_buffer_size = event.payload.fetch(:max_buffer_size) buffer_fill_ratio = buffer_size.to_f / max_buffer_size Promenade.metric(:kafka_producer_messages).increment(labels) Promenade.metric(:kafka_producer_message_size).observe(labels, ) Promenade.metric(:kafka_producer_buffer_size).set(labels.slice(:client), buffer_size) Promenade.metric(:kafka_producer_max_buffer_size).set(labels.slice(:client), max_buffer_size) Promenade.metric(:kafka_producer_buffer_fill_ratio).set(labels.slice(:client), buffer_fill_ratio) end |
#topic_error(event) ⇒ Object
101 102 103 104 105 106 |
# File 'lib/promenade/kafka/producer_subscriber.rb', line 101 def topic_error(event) client = event.payload.fetch(:client_id) topic = event.payload.fetch(:topic) Promenade.metric(:kafka_producer_ack_errors).increment(client: client, topic: topic) end |