Class: Karafka::Pro::Processing::Filters::Expirer
- Defined in:
- lib/karafka/pro/processing/filters/expirer.rb
Overview
Expirer for removing too old messages. It never moves offsets in any way and does not impact the processing flow. It always runs :skip action.
Instance Attribute Summary
Attributes inherited from Base
Instance Method Summary collapse
-
#apply!(messages) ⇒ Object
Removes too old messages.
-
#initialize(ttl) ⇒ Expirer
constructor
A new instance of Expirer.
-
#timeout ⇒ nil
This filter does not deal with timeouts.
Methods inherited from Base
#action, #applied?, #mark_as_consumed?, #marking_cursor, #marking_method
Constructor Details
#initialize(ttl) ⇒ Expirer
Returns a new instance of Expirer.
32 33 34 35 36 |
# File 'lib/karafka/pro/processing/filters/expirer.rb', line 32 def initialize(ttl) super() @ttl = ttl end |
Instance Method Details
#apply!(messages) ⇒ Object
Removes too old messages
41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 |
# File 'lib/karafka/pro/processing/filters/expirer.rb', line 41 def apply!() @applied = false # Time on message is in seconds with ms precision, so we need to convert the ttl that # is in ms to this format border = Time.now.utc - (@ttl / 1_000.to_f) .delete_if do || too_old = . < border @applied = true if too_old too_old end end |
#timeout ⇒ nil
Returns this filter does not deal with timeouts.
58 59 60 |
# File 'lib/karafka/pro/processing/filters/expirer.rb', line 58 def timeout nil end |