Class: Karafka::Pro::PerformanceTracker
- Inherits:
-
Object
- Object
- Karafka::Pro::PerformanceTracker
- Includes:
- Singleton
- Defined in:
- lib/karafka/pro/performance_tracker.rb
Overview
Tracker used to keep track of performance metrics It provides insights that can be used to optimize processing flow
Instance Method Summary collapse
-
#initialize ⇒ PerformanceTracker
constructor
Builds up nested concurrent hash for data tracking.
-
#on_consumer_consumed(event) ⇒ Object
Tracks time taken to process a single message of a given topic partition.
-
#processing_time_p95(topic, partition) ⇒ Float
P95 processing time of a single message from a single topic partition.
Constructor Details
#initialize ⇒ PerformanceTracker
Builds up nested concurrent hash for data tracking
27 28 29 30 31 32 33 34 35 36 37 |
# File 'lib/karafka/pro/performance_tracker.rb', line 27 def initialize @processing_times = Concurrent::Map.new do |topics_hash, topic| topics_hash.compute_if_absent(topic) do Concurrent::Map.new do |partitions_hash, partition| # This array does not have to be concurrent because we always access single # partition data via instrumentation that operates in a single thread via consumer partitions_hash.compute_if_absent(partition) { [] } end end end end |
Instance Method Details
#on_consumer_consumed(event) ⇒ Object
Tracks time taken to process a single message of a given topic partition
54 55 56 57 58 59 60 61 62 63 64 65 66 |
# File 'lib/karafka/pro/performance_tracker.rb', line 54 def on_consumer_consumed(event) consumer = event[:caller] = consumer. topic = ..topic partition = ..partition samples = @processing_times[topic][partition] samples << event[:time] / .count return unless samples.size > SAMPLES_COUNT samples.shift end |
#processing_time_p95(topic, partition) ⇒ Float
Returns p95 processing time of a single message from a single topic partition.
42 43 44 45 46 47 48 49 |
# File 'lib/karafka/pro/performance_tracker.rb', line 42 def processing_time_p95(topic, partition) values = @processing_times[topic][partition] return 0 if values.empty? return values.first if values.size == 1 percentile(0.95, values) end |