Class: Karafka::Pro::PerformanceTracker

Inherits:
Object
  • Object
show all
Includes:
Singleton
Defined in:
lib/karafka/pro/performance_tracker.rb

Overview

Tracker used to keep track of performance metrics It provides insights that can be used to optimize processing flow

Instance Method Summary collapse

Constructor Details

#initializePerformanceTracker

Builds up nested concurrent hash for data tracking



27
28
29
30
31
32
33
34
35
36
37
# File 'lib/karafka/pro/performance_tracker.rb', line 27

def initialize
  @processing_times = Concurrent::Map.new do |topics_hash, topic|
    topics_hash.compute_if_absent(topic) do
      Concurrent::Map.new do |partitions_hash, partition|
        # This array does not have to be concurrent because we always access single
        # partition data via instrumentation that operates in a single thread via consumer
        partitions_hash.compute_if_absent(partition) { [] }
      end
    end
  end
end

Instance Method Details

#on_consumer_consumed(event) ⇒ Object

Tracks time taken to process a single message of a given topic partition

Parameters:

  • event (Karafka::Core::Monitoring::Event)

    event details



54
55
56
57
58
59
60
61
62
63
64
65
66
# File 'lib/karafka/pro/performance_tracker.rb', line 54

def on_consumer_consumed(event)
  consumer = event[:caller]
  messages = consumer.messages
  topic = messages..topic
  partition = messages..partition

  samples = @processing_times[topic][partition]
  samples << event[:time] / messages.count

  return unless samples.size > SAMPLES_COUNT

  samples.shift
end

#processing_time_p95(topic, partition) ⇒ Float

Returns p95 processing time of a single message from a single topic partition.

Parameters:

  • topic (String)
  • partition (Integer)

Returns:

  • (Float)

    p95 processing time of a single message from a single topic partition



42
43
44
45
46
47
48
49
# File 'lib/karafka/pro/performance_tracker.rb', line 42

def processing_time_p95(topic, partition)
  values = @processing_times[topic][partition]

  return 0 if values.empty?
  return values.first if values.size == 1

  percentile(0.95, values)
end