Class: Karafka::Pro::Instrumentation::PerformanceTracker

Inherits:
Object
  • Object
show all
Includes:
Singleton
Defined in:
lib/karafka/pro/instrumentation/performance_tracker.rb

Overview

Note:

Even if we have some race-conditions here it is relevant due to the quantity of data. This is why we do not mutex it.

Tracker used to keep track of performance metrics It provides insights that can be used to optimize processing flow

Instance Method Summary collapse

Constructor Details

#initializePerformanceTracker

Builds up nested concurrent hash for data tracking



31
32
33
34
35
36
37
# File 'lib/karafka/pro/instrumentation/performance_tracker.rb', line 31

def initialize
  @processing_times = Hash.new do |topics_hash, topic|
    topics_hash[topic] = Hash.new do |partitions_hash, partition|
      partitions_hash[partition] = []
    end
  end
end

Instance Method Details

#on_consumer_consumed(event) ⇒ Object

Tracks time taken to process a single message of a given topic partition

Parameters:

  • event (Karafka::Core::Monitoring::Event)

    event details



54
55
56
57
58
59
60
61
62
63
64
65
66
# File 'lib/karafka/pro/instrumentation/performance_tracker.rb', line 54

def on_consumer_consumed(event)
  consumer = event[:caller]
  messages = consumer.messages
  topic = messages..topic
  partition = messages..partition

  samples = @processing_times[topic][partition]
  samples << event[:time] / messages.count

  return unless samples.size > SAMPLES_COUNT

  samples.shift
end

#processing_time_p95(topic, partition) ⇒ Float

Returns p95 processing time of a single message from a single topic partition.

Parameters:

  • topic (String)
  • partition (Integer)

Returns:

  • (Float)

    p95 processing time of a single message from a single topic partition



42
43
44
45
46
47
48
49
# File 'lib/karafka/pro/instrumentation/performance_tracker.rb', line 42

def processing_time_p95(topic, partition)
  values = @processing_times[topic][partition]

  return 0 if values.empty?
  return values.first if values.size == 1

  percentile(0.95, values)
end