Class: Karafka::Instrumentation::Callbacks::Statistics

Inherits:
Object
  • Object
show all
Defined in:
lib/karafka/instrumentation/callbacks/statistics.rb

Overview

Statistics callback handler

See Also:

  • for details on why we decorate those statistics

Instance Method Summary collapse

Constructor Details

#initialize(subscription_group_id, consumer_group_id, client_name) ⇒ Statistics

Returns a new instance of Statistics.

Parameters:

  • subscription_group_id (String)

    id of the current subscription group

  • consumer_group_id (String)

    id of the current consumer group

  • client_name (String)

    rdkafka client name



13
14
15
16
17
18
# File 'lib/karafka/instrumentation/callbacks/statistics.rb', line 13

def initialize(subscription_group_id, consumer_group_id, client_name)
  @subscription_group_id = subscription_group_id
  @consumer_group_id = consumer_group_id
  @client_name = client_name
  @statistics_decorator = ::Karafka::Core::Monitoring::StatisticsDecorator.new
end

Instance Method Details

#call(statistics) ⇒ Object

Emits decorated statistics to the monitor

Parameters:

  • statistics (Hash)

    rdkafka statistics



22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
# File 'lib/karafka/instrumentation/callbacks/statistics.rb', line 22

def call(statistics)
  # Emit only statistics related to our client
  # rdkafka does not have per-instance statistics hook, thus we need to make sure that we
  # emit only stats that are related to current producer. Otherwise we would emit all of
  # all the time.
  return unless @client_name == statistics['name']

  ::Karafka.monitor.instrument(
    'statistics.emitted',
    subscription_group_id: @subscription_group_id,
    consumer_group_id: @consumer_group_id,
    statistics: @statistics_decorator.call(statistics)
  )
# We need to catch and handle any potential errors coming from the instrumentation pipeline
# as otherwise, in case of statistics which run in the main librdkafka thread, any crash
# will hang the whole process.
rescue StandardError => e
  ::Karafka.monitor.instrument(
    'error.occurred',
    caller: self,
    subscription_group_id: @subscription_group_id,
    consumer_group_id: @consumer_group_id,
    type: 'statistics.emitted.error',
    error: e
  )
end