Class: Karafka::Pro::Instrumentation::PerformanceTracker
- Inherits:
-
Object
- Object
- Karafka::Pro::Instrumentation::PerformanceTracker
- Includes:
- Singleton
- Defined in:
- lib/karafka/pro/instrumentation/performance_tracker.rb
Overview
Note:
Even if we have some race-conditions here it is relevant due to the quantity of data. This is why we do not mutex it.
Tracker used to keep track of performance metrics It provides insights that can be used to optimize processing flow
Instance Method Summary collapse
-
#initialize ⇒ PerformanceTracker
constructor
Builds up nested concurrent hash for data tracking.
-
#on_consumer_consumed(event) ⇒ Object
Tracks time taken to process a single message of a given topic partition.
-
#processing_time_p95(topic, partition) ⇒ Float
P95 processing time of a single message from a single topic partition.
Constructor Details
#initialize ⇒ PerformanceTracker
Builds up nested concurrent hash for data tracking
40 41 42 43 44 45 46 |
# File 'lib/karafka/pro/instrumentation/performance_tracker.rb', line 40 def initialize @processing_times = Hash.new do |topics_hash, topic| topics_hash[topic] = Hash.new do |partitions_hash, partition| partitions_hash[partition] = [] end end end |
Instance Method Details
#on_consumer_consumed(event) ⇒ Object
Tracks time taken to process a single message of a given topic partition
63 64 65 66 67 68 69 70 71 72 73 74 75 |
# File 'lib/karafka/pro/instrumentation/performance_tracker.rb', line 63 def on_consumer_consumed(event) consumer = event[:caller] = consumer. topic = ..topic partition = ..partition samples = @processing_times[topic][partition] samples << (event[:time] / .size) return unless samples.size > SAMPLES_COUNT samples.shift end |
#processing_time_p95(topic, partition) ⇒ Float
Returns p95 processing time of a single message from a single topic partition.
51 52 53 54 55 56 57 58 |
# File 'lib/karafka/pro/instrumentation/performance_tracker.rb', line 51 def processing_time_p95(topic, partition) values = @processing_times[topic][partition] return 0 if values.empty? return values.first if values.size == 1 percentile(0.95, values) end |