Class: Kafka::Statsd::ConsumerSubscriber
- Inherits:
-
StatsdSubscriber
- Object
- ActiveSupport::Subscriber
- StatsdSubscriber
- Kafka::Statsd::ConsumerSubscriber
- Defined in:
- lib/kafka/statsd.rb
Instance Method Summary collapse
- #fetch_batch(event) ⇒ Object
- #join_group(event) ⇒ Object
- #leave_group(event) ⇒ Object
- #pause_status(event) ⇒ Object
- #process_batch(event) ⇒ Object
- #process_message(event) ⇒ Object
- #sync_group(event) ⇒ Object
Instance Method Details
#fetch_batch(event) ⇒ Object
124 125 126 127 128 129 130 131 132 133 134 |
# File 'lib/kafka/statsd.rb', line 124 def fetch_batch(event) lag = event.payload.fetch(:offset_lag) batch_size = event.payload.fetch(:message_count) client = event.payload.fetch(:client_id) group_id = event.payload.fetch(:group_id) topic = event.payload.fetch(:topic) partition = event.payload.fetch(:partition) count("consumer.#{client}.#{group_id}.#{topic}.#{partition}.batch_size", batch_size) gauge("consumer.#{client}.#{group_id}.#{topic}.#{partition}.lag", lag) end |
#join_group(event) ⇒ Object
136 137 138 139 140 141 142 143 144 145 |
# File 'lib/kafka/statsd.rb', line 136 def join_group(event) client = event.payload.fetch(:client_id) group_id = event.payload.fetch(:group_id) timing("consumer.#{client}.#{group_id}.join_group", event.duration) if event.payload.key?(:exception) increment("consumer.#{client}.#{group_id}.join_group.errors") end end |
#leave_group(event) ⇒ Object
158 159 160 161 162 163 164 165 166 167 |
# File 'lib/kafka/statsd.rb', line 158 def leave_group(event) client = event.payload.fetch(:client_id) group_id = event.payload.fetch(:group_id) timing("consumer.#{client}.#{group_id}.leave_group", event.duration) if event.payload.key?(:exception) increment("consumer.#{client}.#{group_id}.leave_group.errors") end end |
#pause_status(event) ⇒ Object
169 170 171 172 173 174 175 176 177 178 |
# File 'lib/kafka/statsd.rb', line 169 def pause_status(event) client = event.payload.fetch(:client_id) group_id = event.payload.fetch(:group_id) topic = event.payload.fetch(:topic) partition = event.payload.fetch(:partition) duration = event.payload.fetch(:duration) gauge("consumer.#{client}.#{group_id}.#{topic}.#{partition}.pause.duration", duration) end |
#process_batch(event) ⇒ Object
109 110 111 112 113 114 115 116 117 118 119 120 121 122 |
# File 'lib/kafka/statsd.rb', line 109 def process_batch(event) = event.payload.fetch(:message_count) client = event.payload.fetch(:client_id) group_id = event.payload.fetch(:group_id) topic = event.payload.fetch(:topic) partition = event.payload.fetch(:partition) if event.payload.key?(:exception) increment("consumer.#{client}.#{group_id}.#{topic}.#{partition}.process_batch.errors") else timing("consumer.#{client}.#{group_id}.#{topic}.#{partition}.process_batch.latency", event.duration) count("consumer.#{client}.#{group_id}.#{topic}.#{partition}.messages", ) end end |
#process_message(event) ⇒ Object
84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 |
# File 'lib/kafka/statsd.rb', line 84 def (event) offset_lag = event.payload.fetch(:offset_lag) create_time = event.payload.fetch(:create_time) client = event.payload.fetch(:client_id) group_id = event.payload.fetch(:group_id) topic = event.payload.fetch(:topic) partition = event.payload.fetch(:partition) time_lag = create_time && ((Time.now - create_time) * 1000).to_i if event.payload.key?(:exception) increment("consumer.#{client}.#{group_id}.#{topic}.#{partition}.process_message.errors") else timing("consumer.#{client}.#{group_id}.#{topic}.#{partition}.process_message.latency", event.duration) increment("consumer.#{client}.#{group_id}.#{topic}.#{partition}.messages") end gauge("consumer.#{client}.#{group_id}.#{topic}.#{partition}.lag", offset_lag) # Not all messages have timestamps. if time_lag gauge("consumer.#{client}.#{group_id}.#{topic}.#{partition}.time_lag", time_lag) end end |
#sync_group(event) ⇒ Object
147 148 149 150 151 152 153 154 155 156 |
# File 'lib/kafka/statsd.rb', line 147 def sync_group(event) client = event.payload.fetch(:client_id) group_id = event.payload.fetch(:group_id) timing("consumer.#{client}.#{group_id}.sync_group", event.duration) if event.payload.key?(:exception) increment("consumer.#{client}.#{group_id}.sync_group.errors") end end |