Class: Racecar::Datadog::ProducerSubscriber

Inherits:
StatsdSubscriber
  • Object
show all
Defined in:
lib/racecar/datadog.rb

Instance Method Summary collapse

Instance Method Details

#acknowledged_message(event) ⇒ Object



258
259
260
261
262
263
# File 'lib/racecar/datadog.rb', line 258

def acknowledged_message(event)
  tags = { client: event.payload.fetch(:client_id) }

  # Number of messages ACK'd for the topic.
  increment("producer.ack.messages", tags: tags)
end

#deliver_messages(event) ⇒ Object



244
245
246
247
248
249
250
251
252
253
254
255
256
# File 'lib/racecar/datadog.rb', line 244

def deliver_messages(event)
  client = event.payload.fetch(:client_id)
  message_count = event.payload.fetch(:delivered_message_count)

  tags = {
    client: client,
  }

  timing("producer.deliver.latency", event.duration, tags: tags)

  # Messages delivered to Kafka:
  count("producer.deliver.messages", message_count, tags: tags)
end

#produce_async(event) ⇒ Object



273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
# File 'lib/racecar/datadog.rb', line 273

def produce_async(event)
  client = event.payload.fetch(:client_id)
  topic = event.payload.fetch(:topic)
  message_size = event.payload.fetch(:message_size)
  buffer_size = event.payload.fetch(:buffer_size)

  tags = {
    client: client,
    topic: topic,
  }

  if event.payload.key?(:exception)
    increment("producer.produce.errors", tags: tags)
  end

  # This gets us the write rate.
  increment("producer.produce.messages", tags: tags.merge(topic: topic))

  # Information about typical/average/95p message size.
  histogram("producer.produce.message_size", message_size, tags: tags.merge(topic: topic))

  # Aggregate message size.
  count("producer.produce.message_size.sum", message_size, tags: tags.merge(topic: topic))

  # This gets us the avg/max buffer size per producer.
  histogram("producer.buffer.size", buffer_size, tags: tags)
end

#produce_delivery_error(event) ⇒ Object



265
266
267
268
269
270
271
# File 'lib/racecar/datadog.rb', line 265

def produce_delivery_error(event)
  tags = {
    client: event.payload.fetch(:client_id),
  }

  increment("producer.produce.delivery.errors", tags: tags)
end

#produce_message(event) ⇒ Object



216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
# File 'lib/racecar/datadog.rb', line 216

def produce_message(event)
  client = event.payload.fetch(:client_id)
  topic = event.payload.fetch(:topic)
  message_size = event.payload.fetch(:message_size)
  buffer_size = event.payload.fetch(:buffer_size)

  tags = {
    client: client,
    topic: topic,
  }

  if event.payload.key?(:exception)
    increment("producer.produce.errors", tags: tags)
  end

  # This gets us the write rate.
  increment("producer.produce.messages", tags: tags.merge(topic: topic))

  # Information about typical/average/95p message size.
  histogram("producer.produce.message_size", message_size, tags: tags.merge(topic: topic))

  # Aggregate message size.
  count("producer.produce.message_size.sum", message_size, tags: tags.merge(topic: topic))

  # This gets us the avg/max buffer size per producer.
  histogram("producer.buffer.size", buffer_size, tags: tags)
end

#produce_sync(event) ⇒ Object



301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
# File 'lib/racecar/datadog.rb', line 301

def produce_sync(event)
  client = event.payload.fetch(:client_id)
  topic = event.payload.fetch(:topic)
  message_size = event.payload.fetch(:message_size)

  tags = {
    client: client,
    topic: topic,
  }

  if event.payload.key?(:exception)
    increment("producer.produce.errors", tags: tags)
  end


  # This gets us the write rate.
  increment("producer.produce.messages", tags: tags.merge(topic: topic))

  # Information about typical/average/95p message size.
  histogram("producer.produce.message_size", message_size, tags: tags.merge(topic: topic))

  # Aggregate message size.
  count("producer.produce.message_size.sum", message_size, tags: tags.merge(topic: topic))
end