Class: Karafka::Pro::Processing::AdaptiveIterator::Tracker
- Inherits:
-
Object
- Object
- Karafka::Pro::Processing::AdaptiveIterator::Tracker
- Includes:
- Core::Helpers::Time
- Defined in:
- lib/karafka/pro/processing/adaptive_iterator/tracker.rb
Overview
Tracker is responsible for monitoring the processing of messages within the poll interval limitation. It ensures that the consumer does not exceed the maximum poll interval by tracking the processing cost and determining when to halt further processing (if needed).
Instance Method Summary collapse
-
#enough? ⇒ Boolean
Determines if there is enough time left to process more messages without exceeding the maximum poll interval, considering both the safety margin and adaptive margin.
-
#initialize(safety_margin, last_polled_at, max_poll_interval_ms) ⇒ Tracker
constructor
Initializes a new Tracker instance.
-
#track { ... } ⇒ Object
Tracks the processing time of a block and updates the maximum processing cost.
Constructor Details
#initialize(safety_margin, last_polled_at, max_poll_interval_ms) ⇒ Tracker
Initializes a new Tracker instance.
30 31 32 33 34 35 36 37 38 39 |
# File 'lib/karafka/pro/processing/adaptive_iterator/tracker.rb', line 30 def initialize( safety_margin, last_polled_at, max_poll_interval_ms ) @safety_margin = safety_margin / 100.0 # Convert percentage to decimal @last_polled_at = last_polled_at @max_processing_cost = 0 @max_poll_interval_ms = max_poll_interval_ms end |
Instance Method Details
#enough? ⇒ Boolean
Determines if there is enough time left to process more messages without exceeding the maximum poll interval, considering both the safety margin and adaptive margin.
60 61 62 63 64 65 66 67 68 69 70 |
# File 'lib/karafka/pro/processing/adaptive_iterator/tracker.rb', line 60 def enough? elapsed_time_ms = monotonic_now - @last_polled_at remaining_time_ms = @max_poll_interval_ms - elapsed_time_ms safety_margin_ms = @max_poll_interval_ms * @safety_margin return true if remaining_time_ms <= safety_margin_ms return true if remaining_time_ms - @max_processing_cost <= safety_margin_ms false end |
#track { ... } ⇒ Object
Tracks the processing time of a block and updates the maximum processing cost.
44 45 46 47 48 49 50 51 52 53 54 |
# File 'lib/karafka/pro/processing/adaptive_iterator/tracker.rb', line 44 def track before = monotonic_now yield time_taken = monotonic_now - before return unless time_taken > @max_processing_cost @max_processing_cost = time_taken end |