Class: Promenade::Kafka::ConsumerSubscriber
- Inherits:
-
Subscriber
- Object
- ActiveSupport::Subscriber
- Subscriber
- Promenade::Kafka::ConsumerSubscriber
- Defined in:
- lib/promenade/kafka/consumer_subscriber.rb
Instance Method Summary collapse
- #fetch_batch(event) ⇒ Object
- #join_group(event) ⇒ Object
- #leave_group(event) ⇒ Object
- #pause_status(event) ⇒ Object
- #process_batch(event) ⇒ Object
-
#process_message(event) ⇒ Object
rubocop:disable Metrics/AbcSize.
- #sync_group(event) ⇒ Object
Instance Method Details
#fetch_batch(event) ⇒ Object
108 109 110 111 112 113 114 115 |
# File 'lib/promenade/kafka/consumer_subscriber.rb', line 108 def fetch_batch(event) labels = get_labels(event) offset_lag = event.payload.fetch(:offset_lag) = event.payload.fetch(:message_count) Promenade.metric(:kafka_consumer_messages_fetched).increment(labels, ) Promenade.metric(:kafka_consumer_ofset_lag).set(labels, offset_lag) end |
#join_group(event) ⇒ Object
117 118 119 120 121 |
# File 'lib/promenade/kafka/consumer_subscriber.rb', line 117 def join_group(event) labels = group_labels(event) Promenade.metric(:kafka_consumer_join_group).observe(labels, event.duration) Promenade.metric(:kafka_consumer_join_group_errors).increment(labels) if event.payload.key?(:exception) end |
#leave_group(event) ⇒ Object
129 130 131 132 133 |
# File 'lib/promenade/kafka/consumer_subscriber.rb', line 129 def leave_group(event) labels = group_labels(event) Promenade.metric(:kafka_consumer_leave_group).observe(labels, event.duration) Promenade.metric(:kafka_consumer_leave_group_errors).increment(labels) if event.payload.key?(:exception) end |
#pause_status(event) ⇒ Object
135 136 137 |
# File 'lib/promenade/kafka/consumer_subscriber.rb', line 135 def pause_status(event) Promenade.metric(:kafka_consumer_pause_duration).observe(get_labels(event), event.payload.fetch(:duration)) end |
#process_batch(event) ⇒ Object
93 94 95 96 97 98 99 100 101 102 103 104 105 106 |
# File 'lib/promenade/kafka/consumer_subscriber.rb', line 93 def process_batch(event) labels = get_labels(event) offset_lag = event.payload.fetch(:offset_lag) = event.payload.fetch(:message_count) if event.payload.key?(:exception) Promenade.metric(:kafka_consumer_batch_processing_errors).increment(labels) else Promenade.metric(:kafka_consumer_messages_processed).increment(labels, ) Promenade.metric(:kafka_consumer_batch_processing_latency).observe(labels, event.duration) end Promenade.metric(:kafka_consumer_ofset_lag).set(labels, offset_lag) end |
#process_message(event) ⇒ Object
rubocop:disable Metrics/AbcSize
74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 |
# File 'lib/promenade/kafka/consumer_subscriber.rb', line 74 def (event) # rubocop:disable Metrics/AbcSize labels = get_labels(event) offset_lag = event.payload.fetch(:offset_lag) create_time = event.payload.fetch(:create_time) time_lag = create_time && ((Time.now.utc - create_time) * 1000).to_i if event.payload.key?(:exception) Promenade.metric(:kafka_consumer_message_processing_errors).increment(labels) else Promenade.metric(:kafka_consumer_messages_processed).increment(labels) Promenade.metric(:kafka_consumer_message_processing_latency).observe(labels, event.duration) end Promenade.metric(:kafka_consumer_ofset_lag).set(labels, offset_lag) # Not all messages have timestamps. Promenade.metric(:kafka_consumer_time_lag).set(labels, time_lag) if time_lag end |
#sync_group(event) ⇒ Object
123 124 125 126 127 |
# File 'lib/promenade/kafka/consumer_subscriber.rb', line 123 def sync_group(event) labels = group_labels(event) Promenade.metric(:kafka_consumer_sync_group).observe(labels, event.duration) Promenade.metric(:kafka_consumer_sync_group_errors).increment(labels) if event.payload.key?(:exception) end |