Class: Kafka::Datadog::ConsumerSubscriber
- Inherits:
-
StatsdSubscriber
- Object
- ActiveSupport::Subscriber
- StatsdSubscriber
- Kafka::Datadog::ConsumerSubscriber
- Defined in:
- lib/kafka/datadog.rb
Instance Method Summary collapse
- #fetch_batch(event) ⇒ Object
- #join_group(event) ⇒ Object
- #leave_group(event) ⇒ Object
- #loop(event) ⇒ Object
- #pause_status(event) ⇒ Object
- #process_batch(event) ⇒ Object
- #process_message(event) ⇒ Object
- #sync_group(event) ⇒ Object
Instance Method Details
#fetch_batch(event) ⇒ Object
196 197 198 199 200 201 202 203 204 205 206 207 208 209 |
# File 'lib/kafka/datadog.rb', line 196 def fetch_batch(event) lag = event.payload.fetch(:offset_lag) batch_size = event.payload.fetch(:message_count) = { client: event.payload.fetch(:client_id), group_id: event.payload.fetch(:group_id), topic: event.payload.fetch(:topic), partition: event.payload.fetch(:partition), } histogram("consumer.batch_size", batch_size, tags: ) gauge("consumer.lag", lag, tags: ) end |
#join_group(event) ⇒ Object
211 212 213 214 215 216 217 218 219 220 221 222 |
# File 'lib/kafka/datadog.rb', line 211 def join_group(event) = { client: event.payload.fetch(:client_id), group_id: event.payload.fetch(:group_id), } timing("consumer.join_group", event.duration, tags: ) if event.payload.key?(:exception) increment("consumer.join_group.errors", tags: ) end end |
#leave_group(event) ⇒ Object
237 238 239 240 241 242 243 244 245 246 247 248 |
# File 'lib/kafka/datadog.rb', line 237 def leave_group(event) = { client: event.payload.fetch(:client_id), group_id: event.payload.fetch(:group_id), } timing("consumer.leave_group", event.duration, tags: ) if event.payload.key?(:exception) increment("consumer.leave_group.errors", tags: ) end end |
#loop(event) ⇒ Object
250 251 252 253 254 255 256 257 |
# File 'lib/kafka/datadog.rb', line 250 def loop(event) = { client: event.payload.fetch(:client_id), group_id: event.payload.fetch(:group_id), } histogram("consumer.loop.duration", event.duration, tags: ) end |
#pause_status(event) ⇒ Object
259 260 261 262 263 264 265 266 267 268 269 270 |
# File 'lib/kafka/datadog.rb', line 259 def pause_status(event) = { client: event.payload.fetch(:client_id), group_id: event.payload.fetch(:group_id), topic: event.payload.fetch(:topic), partition: event.payload.fetch(:partition), } duration = event.payload.fetch(:duration) gauge("consumer.pause.duration", duration, tags: ) end |
#process_batch(event) ⇒ Object
169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 |
# File 'lib/kafka/datadog.rb', line 169 def process_batch(event) offset = event.payload.fetch(:last_offset) = event.payload.fetch(:message_count) create_time = event.payload.fetch(:last_create_time) time_lag = create_time && ((Time.now - create_time) * 1000).to_i = { client: event.payload.fetch(:client_id), group_id: event.payload.fetch(:group_id), topic: event.payload.fetch(:topic), partition: event.payload.fetch(:partition), } if event.payload.key?(:exception) increment("consumer.process_batch.errors", tags: ) else timing("consumer.process_batch.latency", event.duration, tags: ) count("consumer.messages", , tags: ) end gauge("consumer.offset", offset, tags: ) if time_lag gauge("consumer.time_lag", time_lag, tags: ) end end |
#process_message(event) ⇒ Object
140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 |
# File 'lib/kafka/datadog.rb', line 140 def (event) offset = event.payload.fetch(:offset) offset_lag = event.payload.fetch(:offset_lag) create_time = event.payload.fetch(:create_time) time_lag = create_time && ((Time.now - create_time) * 1000).to_i = { client: event.payload.fetch(:client_id), group_id: event.payload.fetch(:group_id), topic: event.payload.fetch(:topic), partition: event.payload.fetch(:partition), } if event.payload.key?(:exception) increment("consumer.process_message.errors", tags: ) else timing("consumer.process_message.latency", event.duration, tags: ) increment("consumer.messages", tags: ) end gauge("consumer.offset", offset, tags: ) gauge("consumer.lag", offset_lag, tags: ) # Not all messages have timestamps. if time_lag gauge("consumer.time_lag", time_lag, tags: ) end end |
#sync_group(event) ⇒ Object
224 225 226 227 228 229 230 231 232 233 234 235 |
# File 'lib/kafka/datadog.rb', line 224 def sync_group(event) = { client: event.payload.fetch(:client_id), group_id: event.payload.fetch(:group_id), } timing("consumer.sync_group", event.duration, tags: ) if event.payload.key?(:exception) increment("consumer.sync_group.errors", tags: ) end end |