Class: Racecar::Datadog::ConsumerSubscriber

Inherits:
StatsdSubscriber
  • Object
show all
Defined in:
lib/racecar/datadog.rb

Instance Method Summary collapse

Instance Method Details

#join_group(event) ⇒ Object



138
139
140
141
142
143
144
145
146
147
148
149
# File 'lib/racecar/datadog.rb', line 138

def join_group(event)
  tags = {
    client: event.payload.fetch(:client_id),
    group_id: event.payload.fetch(:group_id),
  }

  timing("consumer.join_group", event.duration, tags: tags)

  if event.payload.key?(:exception)
    increment("consumer.join_group.errors", tags: tags)
  end
end

#leave_group(event) ⇒ Object



151
152
153
154
155
156
157
158
159
160
161
162
# File 'lib/racecar/datadog.rb', line 151

def leave_group(event)
  tags = {
    client: event.payload.fetch(:client_id),
    group_id: event.payload.fetch(:group_id),
  }

  timing("consumer.leave_group", event.duration, tags: tags)

  if event.payload.key?(:exception)
    increment("consumer.leave_group.errors", tags: tags)
  end
end

#main_loop(event) ⇒ Object



173
174
175
176
177
178
179
180
# File 'lib/racecar/datadog.rb', line 173

def main_loop(event)
  tags = {
    client: event.payload.fetch(:client_id),
    group_id: event.payload.fetch(:group_id),
  }

  histogram("consumer.loop.duration", event.duration, tags: tags)
end

#pause_status(event) ⇒ Object



182
183
184
185
186
# File 'lib/racecar/datadog.rb', line 182

def pause_status(event)
  duration = event.payload.fetch(:duration)

  gauge("consumer.pause.duration", duration, tags: default_tags(event))
end

#poll_retry(event) ⇒ Object



164
165
166
167
168
169
170
171
# File 'lib/racecar/datadog.rb', line 164

def poll_retry(event)
  tags = {
    client: event.payload.fetch(:client_id),
    group_id: event.payload.fetch(:group_id),
  }
  rdkafka_error_code = event.payload.fetch(:exception).code.to_s.gsub(/\W/, '')
  increment("consumer.poll.rdkafka_error.#{rdkafka_error_code}", tags: tags)
end

#process_batch(event) ⇒ Object



116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
# File 'lib/racecar/datadog.rb', line 116

def process_batch(event)
  offset = event.payload.fetch(:last_offset)
  messages = event.payload.fetch(:message_count)
  last_create_time = event.payload.fetch(:last_create_time)
  time_lag = last_create_time && ((Time.now - last_create_time) * 1000).to_i
  tags = default_tags(event)

  if event.payload.key?(:exception)
    increment("consumer.process_batch.errors", tags: tags)
  else
    timing("consumer.process_batch.latency", event.duration, tags: tags)
    count("consumer.messages", messages, tags: tags)
  end

  histogram("consumer.batch_size", messages, tags: tags)
  gauge("consumer.offset", offset, tags: tags)

  if time_lag
    gauge("consumer.time_lag", time_lag, tags: tags)
  end
end

#process_message(event) ⇒ Object



95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
# File 'lib/racecar/datadog.rb', line 95

def process_message(event)
  offset = event.payload.fetch(:offset)
  create_time = event.payload.fetch(:create_time)
  time_lag = create_time && ((Time.now - create_time) * 1000).to_i
  tags = default_tags(event)

  if event.payload.key?(:exception)
    increment("consumer.process_message.errors", tags: tags)
  else
    timing("consumer.process_message.latency", event.duration, tags: tags)
    increment("consumer.messages", tags: tags)
  end

  gauge("consumer.offset", offset, tags: tags)

  # Not all messages have timestamps.
  if time_lag
    gauge("consumer.time_lag", time_lag, tags: tags)
  end
end