Class: Karafka::Pro::Processing::Filters::Delayer
- Defined in:
- lib/karafka/pro/processing/filters/delayer.rb
Overview
A filter that allows us to delay processing by pausing until time is right.
Instance Attribute Summary
Attributes inherited from Base
Instance Method Summary collapse
-
#action ⇒ Symbol
Action to take on post-filtering.
-
#apply!(messages) ⇒ Object
Removes too young messages.
-
#initialize(delay) ⇒ Delayer
constructor
A new instance of Delayer.
-
#timeout ⇒ Integer
Timeout delay in ms.
Methods inherited from Base
#applied?, #mark_as_consumed?, #marking_method
Constructor Details
#initialize(delay) ⇒ Delayer
Returns a new instance of Delayer.
21 22 23 24 25 |
# File 'lib/karafka/pro/processing/filters/delayer.rb', line 21 def initialize(delay) super() @delay = delay end |
Instance Method Details
#action ⇒ Symbol
Returns action to take on post-filtering.
61 62 63 64 65 |
# File 'lib/karafka/pro/processing/filters/delayer.rb', line 61 def action return :skip unless applied? timeout <= 0 ? :seek : :pause end |
#apply!(messages) ⇒ Object
Removes too young messages
30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 |
# File 'lib/karafka/pro/processing/filters/delayer.rb', line 30 def apply!() @applied = false @cursor = nil # Time on message is in seconds with ms precision, so we need to convert the ttl that # is in ms to this format border = ::Time.now.utc - @delay / 1_000.0 .delete_if do || too_young = . > border if too_young @applied = true @cursor ||= end @applied end end |
#timeout ⇒ Integer
Returns timeout delay in ms.
52 53 54 55 56 57 58 |
# File 'lib/karafka/pro/processing/filters/delayer.rb', line 52 def timeout return 0 unless @cursor timeout = (@delay / 1_000.0) - (::Time.now.utc - @cursor.) timeout <= 0 ? 0 : timeout * 1_000 end |