Class: Karafka::Pro::Instrumentation::PerformanceTracker

Inherits:
Object
  • Object
show all
Includes:
Singleton
Defined in:
lib/karafka/pro/instrumentation/performance_tracker.rb

Overview

Note:

Even if we have some race-conditions here it is relevant due to the quantity of data. This is why we do not mutex it.

Tracker used to keep track of performance metrics It provides insights that can be used to optimize processing flow

Instance Method Summary collapse

Constructor Details

#initializePerformanceTracker

Builds up nested concurrent hash for data tracking



40
41
42
43
44
45
46
# File 'lib/karafka/pro/instrumentation/performance_tracker.rb', line 40

def initialize
  @processing_times = Hash.new do |topics_hash, topic|
    topics_hash[topic] = Hash.new do |partitions_hash, partition|
      partitions_hash[partition] = []
    end
  end
end

Instance Method Details

#on_consumer_consumed(event) ⇒ Object

Tracks time taken to process a single message of a given topic partition

Parameters:

  • event (Karafka::Core::Monitoring::Event)

    event details



63
64
65
66
67
68
69
70
71
72
73
74
75
# File 'lib/karafka/pro/instrumentation/performance_tracker.rb', line 63

def on_consumer_consumed(event)
  consumer = event[:caller]
  messages = consumer.messages
  topic = messages..topic
  partition = messages..partition

  samples = @processing_times[topic][partition]
  samples << (event[:time] / messages.size)

  return unless samples.size > SAMPLES_COUNT

  samples.shift
end

#processing_time_p95(topic, partition) ⇒ Float

Returns p95 processing time of a single message from a single topic partition.

Parameters:

  • topic (String)
  • partition (Integer)

Returns:

  • (Float)

    p95 processing time of a single message from a single topic partition



51
52
53
54
55
56
57
58
# File 'lib/karafka/pro/instrumentation/performance_tracker.rb', line 51

def processing_time_p95(topic, partition)
  values = @processing_times[topic][partition]

  return 0 if values.empty?
  return values.first if values.size == 1

  percentile(0.95, values)
end