Class: Kafka::Statsd::ProducerSubscriber
- Inherits:
-
StatsdSubscriber
- Object
- ActiveSupport::Subscriber
- StatsdSubscriber
- Kafka::Statsd::ProducerSubscriber
- Defined in:
- lib/kafka/statsd.rb
Instance Method Summary collapse
- #ack_message(event) ⇒ Object
- #buffer_overflow(event) ⇒ Object
- #deliver_messages(event) ⇒ Object
- #produce_message(event) ⇒ Object
- #topic_error(event) ⇒ Object
Instance Method Details
#ack_message(event) ⇒ Object
231 232 233 234 235 236 237 238 239 240 |
# File 'lib/kafka/statsd.rb', line 231 def (event) client = event.payload.fetch(:client_id) topic = event.payload.fetch(:topic) # Number of messages ACK'd for the topic. increment("producer.#{client}.#{topic}.ack.messages") # Histogram of delay between a message being produced and it being ACK'd. timing("producer.#{client}.#{topic}.ack.delay", event.payload.fetch(:delay)) end |
#buffer_overflow(event) ⇒ Object
206 207 208 209 210 211 |
# File 'lib/kafka/statsd.rb', line 206 def buffer_overflow(event) client = event.payload.fetch(:client_id) topic = event.payload.fetch(:topic) increment("producer.#{client}.#{topic}.produce.errors") end |
#deliver_messages(event) ⇒ Object
213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 |
# File 'lib/kafka/statsd.rb', line 213 def (event) client = event.payload.fetch(:client_id) = event.payload.fetch(:delivered_message_count) attempts = event.payload.fetch(:attempts) if event.payload.key?(:exception) increment("producer.#{client}.deliver.errors") end timing("producer.#{client}.deliver.latency", event.duration) # Messages delivered to Kafka: count("producer.#{client}.deliver.messages", ) # Number of attempts to deliver messages: timing("producer.#{client}.deliver.attempts", attempts) end |
#produce_message(event) ⇒ Object
184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 |
# File 'lib/kafka/statsd.rb', line 184 def (event) client = event.payload.fetch(:client_id) topic = event.payload.fetch(:topic) = event.payload.fetch(:message_size) buffer_size = event.payload.fetch(:buffer_size) max_buffer_size = event.payload.fetch(:max_buffer_size) buffer_fill_ratio = buffer_size.to_f / max_buffer_size.to_f buffer_fill_percentage = buffer_fill_ratio * 100.0 # This gets us the write rate. increment("producer.#{client}.#{topic}.produce.messages") timing("producer.#{client}.#{topic}.produce.message_size", ) # This gets us the avg/max buffer size per producer. timing("producer.#{client}.buffer.size", buffer_size) # This gets us the avg/max buffer fill ratio per producer. timing("producer.#{client}.buffer.fill_ratio", buffer_fill_ratio) timing("producer.#{client}.buffer.fill_percentage", buffer_fill_percentage) end |
#topic_error(event) ⇒ Object
242 243 244 245 246 247 |
# File 'lib/kafka/statsd.rb', line 242 def topic_error(event) client = event.payload.fetch(:client_id) topic = event.payload.fetch(:topic) increment("producer.#{client}.#{topic}.ack.errors") end |