Class: Kafka::Prometheus::ProducerSubscriber
- Inherits:
-
ActiveSupport::Subscriber
- Object
- ActiveSupport::Subscriber
- Kafka::Prometheus::ProducerSubscriber
- Defined in:
- lib/kafka/prometheus.rb
Instance Method Summary collapse
- #ack_message(event) ⇒ Object
- #buffer_overflow(event) ⇒ Object
- #deliver_messages(event) ⇒ Object
-
#initialize ⇒ ProducerSubscriber
constructor
A new instance of ProducerSubscriber.
- #produce_message(event) ⇒ Object
- #topic_error(event) ⇒ Object
Constructor Details
#initialize ⇒ ProducerSubscriber
Returns a new instance of ProducerSubscriber.
185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 |
# File 'lib/kafka/prometheus.rb', line 185 def initialize super @produce_messages = Prometheus.registry.counter(:producer_produced_messages, docstring: 'Produced messages total', labels: [:client, :topic]) @produce_message_size = Prometheus.registry.histogram(:producer_message_size, docstring: 'Message size', buckets: SIZE_BUCKETS, labels: [:client, :topic]) @buffer_size = Prometheus.registry.histogram(:producer_buffer_size, docstring: 'Buffer size', buckets: SIZE_BUCKETS, labels: [:client]) @buffer_fill_ratio = Prometheus.registry.histogram(:producer_buffer_fill_ratio, docstring: 'Buffer fill ratio', labels: [:client]) @buffer_fill_percentage = Prometheus.registry.histogram(:producer_buffer_fill_percentage, docstring: 'Buffer fill percentage', labels: [:client]) @produce_errors = Prometheus.registry.counter(:producer_produce_errors, docstring: 'Produce errors', labels: [:client, :topic]) @deliver_errors = Prometheus.registry.counter(:producer_deliver_errors, docstring: 'Deliver error', labels: [:client]) @deliver_latency = Prometheus.registry.histogram(:producer_deliver_latency, docstring: 'Delivery latency', buckets: LATENCY_BUCKETS, labels: [:client]) @deliver_messages = Prometheus.registry.counter(:producer_deliver_messages, docstring: 'Total count of delivered messages', labels: [:client]) @deliver_attempts = Prometheus.registry.histogram(:producer_deliver_attempts, docstring: 'Delivery attempts', labels: [:client]) @ack_messages = Prometheus.registry.counter(:producer_ack_messages, docstring: 'Ack', labels: [:client, :topic]) @ack_delay = Prometheus.registry.histogram(:producer_ack_delay, docstring: 'Ack delay', buckets: LATENCY_BUCKETS, labels: [:client, :topic]) @ack_errors = Prometheus.registry.counter(:producer_ack_errors, docstring: 'Ack errors', labels: [:client, :topic]) end |
Instance Method Details
#ack_message(event) ⇒ Object
246 247 248 249 250 251 252 253 254 |
# File 'lib/kafka/prometheus.rb', line 246 def (event) key = { client: event.payload.fetch(:client_id), topic: event.payload.fetch(:topic) } # Number of messages ACK'd for the topic. @ack_messages.increment(labels: key) # Histogram of delay between a message being produced and it being ACK'd. @ack_delay.observe(event.payload.fetch(:delay), labels: key) end |
#buffer_overflow(event) ⇒ Object
226 227 228 229 |
# File 'lib/kafka/prometheus.rb', line 226 def buffer_overflow(event) key = { client: event.payload.fetch(:client_id), topic: event.payload.fetch(:topic) } @produce_errors.increment(labels: key) end |
#deliver_messages(event) ⇒ Object
231 232 233 234 235 236 237 238 239 240 241 242 243 244 |
# File 'lib/kafka/prometheus.rb', line 231 def (event) key = { client: event.payload.fetch(:client_id) } = event.payload.fetch(:delivered_message_count) attempts = event.payload.fetch(:attempts) @deliver_errors.increment(labels: key) if event.payload.key?(:exception) @deliver_latency.observe(event.duration, labels: key) # Messages delivered to Kafka: @deliver_messages.increment(by: , labels: key) # Number of attempts to deliver messages: @deliver_attempts.observe(attempts, labels: key) end |
#produce_message(event) ⇒ Object
204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 |
# File 'lib/kafka/prometheus.rb', line 204 def (event) client = event.payload.fetch(:client_id) key = { client: client, topic: event.payload.fetch(:topic) } = event.payload.fetch(:message_size) buffer_size = event.payload.fetch(:buffer_size) max_buffer_size = event.payload.fetch(:max_buffer_size) buffer_fill_ratio = buffer_size.to_f / max_buffer_size.to_f buffer_fill_percentage = buffer_fill_ratio * 100.0 # This gets us the write rate. @produce_messages.increment(labels: key) @produce_message_size.observe(, labels: key) # This gets us the avg/max buffer size per producer. @buffer_size.observe(buffer_size, labels: { client: client }) # This gets us the avg/max buffer fill ratio per producer. @buffer_fill_ratio.observe(buffer_fill_ratio, labels: { client: client }) @buffer_fill_percentage.observe(buffer_fill_percentage, labels: { client: client }) end |
#topic_error(event) ⇒ Object
256 257 258 259 260 |
# File 'lib/kafka/prometheus.rb', line 256 def topic_error(event) key = { client: event.payload.fetch(:client_id), topic: event.payload.fetch(:topic) } @ack_errors.increment(labels: key) end |