Class: Promenade::Kafka::ProducerSubscriber

Inherits:
Subscriber
  • Object
show all
Defined in:
lib/promenade/kafka/producer_subscriber.rb

Instance Method Summary collapse

Instance Method Details

#ack_message(event) ⇒ Object



93
94
95
96
97
98
99
# File 'lib/promenade/kafka/producer_subscriber.rb', line 93

def ack_message(event)
  labels = get_labels(event)
  delay = event.payload.fetch(:delay)

  Promenade.metric(:kafka_producer_ack_messages).increment(labels)
  Promenade.metric(:kafka_producer_ack_latency).observe(labels, delay)
end

#buffer_overflow(event) ⇒ Object



78
79
80
# File 'lib/promenade/kafka/producer_subscriber.rb', line 78

def buffer_overflow(event)
  Promenade.metric(:kafka_producer_buffer_overflows).increment(get_labels(event))
end

#deliver_messages(event) ⇒ Object

rubocop:disable Metrics/AbcSize



82
83
84
85
86
87
88
89
90
91
# File 'lib/promenade/kafka/producer_subscriber.rb', line 82

def deliver_messages(event) # rubocop:disable Metrics/AbcSize
  labels = { client: event.payload.fetch(:client_id) }
  message_count = event.payload.fetch(:delivered_message_count)
  attempts = event.payload.fetch(:attempts)

  Promenade.metric(:kafka_producer_delivery_errors).increment(labels) if event.payload.key?(:exception)
  Promenade.metric(:kafka_producer_delivery_latency).observe(labels, event.duration)
  Promenade.metric(:kafka_producer_delivered_messages).increment(labels, message_count)
  Promenade.metric(:kafka_producer_delivery_attempts).observe(labels, attempts)
end

#produce_message(event) ⇒ Object

rubocop:disable Metrics/AbcSize



64
65
66
67
68
69
70
71
72
73
74
75
76
# File 'lib/promenade/kafka/producer_subscriber.rb', line 64

def produce_message(event) # rubocop:disable Metrics/AbcSize
  labels = get_labels(event)
  message_size = event.payload.fetch(:message_size)
  buffer_size = event.payload.fetch(:buffer_size)
  max_buffer_size = event.payload.fetch(:max_buffer_size)
  buffer_fill_ratio = buffer_size.to_f / max_buffer_size

  Promenade.metric(:kafka_producer_messages).increment(labels)
  Promenade.metric(:kafka_producer_message_size).observe(labels, message_size)
  Promenade.metric(:kafka_producer_buffer_size).set(labels.slice(:client), buffer_size)
  Promenade.metric(:kafka_producer_max_buffer_size).set(labels.slice(:client), max_buffer_size)
  Promenade.metric(:kafka_producer_buffer_fill_ratio).set(labels.slice(:client), buffer_fill_ratio)
end

#topic_error(event) ⇒ Object



101
102
103
104
105
106
# File 'lib/promenade/kafka/producer_subscriber.rb', line 101

def topic_error(event)
  client = event.payload.fetch(:client_id)
  topic = event.payload.fetch(:topic)

  Promenade.metric(:kafka_producer_ack_errors).increment(client: client, topic: topic)
end