Class: Karafka::Instrumentation::Vendors::Datadog::MetricsListener

Inherits:
Object
  • Object
show all
Extended by:
Forwardable
Includes:
Core::Configurable
Defined in:
lib/karafka/instrumentation/vendors/datadog/metrics_listener.rb

Overview

Note:

You need to setup the ‘dogstatsd-ruby` client and assign it

Listener that can be used to subscribe to Karafka to receive stats via StatsD and/or Datadog

Defined Under Namespace

Classes: RdKafkaMetric

Instance Method Summary collapse

Constructor Details

#initialize(&block) ⇒ MetricsListener

Returns a new instance of MetricsListener.

Parameters:

  • block (Proc)

    configuration block



67
68
69
70
# File 'lib/karafka/instrumentation/vendors/datadog/metrics_listener.rb', line 67

def initialize(&block)
  configure
  setup(&block) if block
end

Instance Method Details

#on_connection_listener_fetch_loop_received(event) ⇒ Object

Reports how many messages we’ve polled and how much time did we spend on it

Parameters:

  • event (Karafka::Core::Monitoring::Event)


110
111
112
113
114
115
116
117
118
119
120
121
# File 'lib/karafka/instrumentation/vendors/datadog/metrics_listener.rb', line 110

def on_connection_listener_fetch_loop_received(event)
  time_taken = event[:time]
  messages_count = event[:messages_buffer].size

  consumer_group_id = event[:subscription_group].consumer_group.id

  tags = ["consumer_group:#{consumer_group_id}"]
  tags.concat(default_tags)

  histogram('listener.polling.time_taken', time_taken, tags: tags)
  histogram('listener.polling.messages', messages_count, tags: tags)
end

#on_consumer_consumed(event) ⇒ Object

Here we report majority of things related to processing as we have access to the consumer

Parameters:

  • event (Karafka::Core::Monitoring::Event)


126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
# File 'lib/karafka/instrumentation/vendors/datadog/metrics_listener.rb', line 126

def on_consumer_consumed(event)
  consumer = event.payload[:caller]
  messages = consumer.messages
   = messages.

  tags = consumer_tags(consumer)
  tags.concat(default_tags)

  count('consumer.messages', messages.count, tags: tags)
  count('consumer.batches', 1, tags: tags)
  gauge('consumer.offset', .last_offset, tags: tags)
  histogram('consumer.consumed.time_taken', event[:time], tags: tags)
  histogram('consumer.batch_size', messages.count, tags: tags)
  histogram('consumer.processing_lag', .processing_lag, tags: tags)
  histogram('consumer.consumption_lag', .consumption_lag, tags: tags)
end

#on_error_occurred(event) ⇒ Object

Increases the errors count by 1

Parameters:

  • event (Karafka::Core::Monitoring::Event)


96
97
98
99
100
101
102
103
104
105
# File 'lib/karafka/instrumentation/vendors/datadog/metrics_listener.rb', line 96

def on_error_occurred(event)
  tags = ["type:#{event[:type]}"]
  tags.concat(default_tags)

  if event.payload[:caller].respond_to?(:messages)
    tags.concat(consumer_tags(event.payload[:caller]))
  end

  count('error_occurred', 1, tags: tags)
end

#on_statistics_emitted(event) ⇒ Object

Hooks up to Karafka instrumentation for emitted statistics

Parameters:

  • event (Karafka::Core::Monitoring::Event)


81
82
83
84
85
86
87
88
89
90
91
# File 'lib/karafka/instrumentation/vendors/datadog/metrics_listener.rb', line 81

def on_statistics_emitted(event)
  statistics = event[:statistics]
  consumer_group_id = event[:consumer_group_id]

  tags = ["consumer_group:#{consumer_group_id}"]
  tags.concat(default_tags)

  rd_kafka_metrics.each do |metric|
    report_metric(metric, statistics, tags)
  end
end

#on_worker_process(event) ⇒ Object

Worker related metrics

Parameters:

  • event (Karafka::Core::Monitoring::Event)


163
164
165
166
167
168
169
170
# File 'lib/karafka/instrumentation/vendors/datadog/metrics_listener.rb', line 163

def on_worker_process(event)
  jq_stats = event[:jobs_queue].statistics

  tags = default_tags
  gauge('worker.total_threads', Karafka::App.config.concurrency, tags: tags)
  histogram('worker.processing', jq_stats[:busy], tags: tags)
  histogram('worker.enqueued_jobs', jq_stats[:enqueued], tags: tags)
end

#on_worker_processed(event) ⇒ Object

We report this metric before and after processing for higher accuracy Without this, the utilization would not be fully reflected

Parameters:

  • event (Karafka::Core::Monitoring::Event)


175
176
177
178
179
# File 'lib/karafka/instrumentation/vendors/datadog/metrics_listener.rb', line 175

def on_worker_processed(event)
  jq_stats = event[:jobs_queue].statistics

  histogram('worker.processing', jq_stats[:busy], tags: default_tags)
end

#setup(&block) ⇒ Object

Note:

We define this alias to be consistent with ‘WaterDrop#setup`

Parameters:

  • block (Proc)

    configuration block



74
75
76
# File 'lib/karafka/instrumentation/vendors/datadog/metrics_listener.rb', line 74

def setup(&block)
  configure(&block)
end