Class: Promenade::Kafka::ConsumerSubscriber

Inherits:
Subscriber
  • Object
show all
Defined in:
lib/promenade/kafka/consumer_subscriber.rb

Instance Method Summary collapse

Instance Method Details

#fetch_batch(event) ⇒ Object



108
109
110
111
112
113
114
115
# File 'lib/promenade/kafka/consumer_subscriber.rb', line 108

def fetch_batch(event)
  labels = get_labels(event)
  offset_lag = event.payload.fetch(:offset_lag)
  messages = event.payload.fetch(:message_count)

  Promenade.metric(:kafka_consumer_messages_fetched).increment(labels, messages)
  Promenade.metric(:kafka_consumer_ofset_lag).set(labels, offset_lag)
end

#join_group(event) ⇒ Object



117
118
119
120
121
# File 'lib/promenade/kafka/consumer_subscriber.rb', line 117

def join_group(event)
  labels = group_labels(event)
  Promenade.metric(:kafka_consumer_join_group).observe(labels, event.duration)
  Promenade.metric(:kafka_consumer_join_group_errors).increment(labels) if event.payload.key?(:exception)
end

#leave_group(event) ⇒ Object



129
130
131
132
133
# File 'lib/promenade/kafka/consumer_subscriber.rb', line 129

def leave_group(event)
  labels = group_labels(event)
  Promenade.metric(:kafka_consumer_leave_group).observe(labels, event.duration)
  Promenade.metric(:kafka_consumer_leave_group_errors).increment(labels) if event.payload.key?(:exception)
end

#pause_status(event) ⇒ Object



135
136
137
# File 'lib/promenade/kafka/consumer_subscriber.rb', line 135

def pause_status(event)
  Promenade.metric(:kafka_consumer_pause_duration).observe(get_labels(event), event.payload.fetch(:duration))
end

#process_batch(event) ⇒ Object



93
94
95
96
97
98
99
100
101
102
103
104
105
106
# File 'lib/promenade/kafka/consumer_subscriber.rb', line 93

def process_batch(event)
  labels = get_labels(event)
  offset_lag = event.payload.fetch(:offset_lag)
  messages = event.payload.fetch(:message_count)

  if event.payload.key?(:exception)
    Promenade.metric(:kafka_consumer_batch_processing_errors).increment(labels)
  else
    Promenade.metric(:kafka_consumer_messages_processed).increment(labels, messages)
    Promenade.metric(:kafka_consumer_batch_processing_latency).observe(labels, event.duration)
  end

  Promenade.metric(:kafka_consumer_ofset_lag).set(labels, offset_lag)
end

#process_message(event) ⇒ Object

rubocop:disable Metrics/AbcSize



74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
# File 'lib/promenade/kafka/consumer_subscriber.rb', line 74

def process_message(event) # rubocop:disable Metrics/AbcSize
  labels = get_labels(event)
  offset_lag = event.payload.fetch(:offset_lag)
  create_time = event.payload.fetch(:create_time)
  time_lag = create_time && ((Time.now.utc - create_time) * 1000).to_i

  if event.payload.key?(:exception)
    Promenade.metric(:kafka_consumer_message_processing_errors).increment(labels)
  else
    Promenade.metric(:kafka_consumer_messages_processed).increment(labels)
    Promenade.metric(:kafka_consumer_message_processing_latency).observe(labels, event.duration)
  end

  Promenade.metric(:kafka_consumer_ofset_lag).set(labels, offset_lag)

  # Not all messages have timestamps.
  Promenade.metric(:kafka_consumer_time_lag).set(labels, time_lag) if time_lag
end

#sync_group(event) ⇒ Object



123
124
125
126
127
# File 'lib/promenade/kafka/consumer_subscriber.rb', line 123

def sync_group(event)
  labels = group_labels(event)
  Promenade.metric(:kafka_consumer_sync_group).observe(labels, event.duration)
  Promenade.metric(:kafka_consumer_sync_group_errors).increment(labels) if event.payload.key?(:exception)
end