Class: Karafka::Pro::Processing::Filters::Throttler
- Defined in:
- lib/karafka/pro/processing/filters/throttler.rb
Overview
Throttler used to limit number of messages we can process in a given time interval The tricky thing is, that even if we throttle on 100 messages, if we’ve reached 100, we still need to indicate, that we throttle despite not receiving 101. Otherwise we will not pause the partition and will fetch more data that we should not process.
This is a special type of a filter that always throttles and makes us wait / seek if anything is applied out.
Instance Attribute Summary
Attributes inherited from Base
Instance Method Summary collapse
-
#action ⇒ Symbol
Action to take upon throttler reaching certain state.
-
#apply!(messages) ⇒ Object
Limits number of messages to a range that we can process (if needed) and keeps track of how many messages we’ve processed in a given time.
-
#initialize(limit, interval) ⇒ Throttler
constructor
A new instance of Throttler.
-
#timeout ⇒ Integer
Minimum number of milliseconds to wait before getting more messages so we are no longer throttled and so we can process at least one message.
Methods inherited from Base
#applied?, #mark_as_consumed?, #marking_method
Constructor Details
#initialize(limit, interval) ⇒ Throttler
Returns a new instance of Throttler.
29 30 31 32 33 34 35 |
# File 'lib/karafka/pro/processing/filters/throttler.rb', line 29 def initialize(limit, interval) super() @limit = limit @interval = interval @requests = Hash.new { |h, k| h[k] = 0 } end |
Instance Method Details
#action ⇒ Symbol
Returns action to take upon throttler reaching certain state.
66 67 68 69 70 71 72 |
# File 'lib/karafka/pro/processing/filters/throttler.rb', line 66 def action if applied? timeout.zero? ? :seek : :pause else :skip end end |
#apply!(messages) ⇒ Object
Limits number of messages to a range that we can process (if needed) and keeps track of how many messages we’ve processed in a given time
41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 |
# File 'lib/karafka/pro/processing/filters/throttler.rb', line 41 def apply!() @applied = false @cursor = nil @time = monotonic_now @requests.delete_if { |, _| < (@time - @interval) } values = @requests.values.sum accepted = 0 .delete_if do || # +1 because of current @applied = (values + accepted + 1) > @limit @cursor = if @applied && @cursor.nil? next true if @applied accepted += 1 false end @requests[@time] += accepted end |
#timeout ⇒ Integer
Returns minimum number of milliseconds to wait before getting more messages so we are no longer throttled and so we can process at least one message.
76 77 78 79 |
# File 'lib/karafka/pro/processing/filters/throttler.rb', line 76 def timeout timeout = @interval - (monotonic_now - @time) timeout <= 0 ? 0 : timeout end |