Class: Karafka::Pro::Processing::Filters::Base
- Inherits:
-
Object
- Object
- Karafka::Pro::Processing::Filters::Base
- Includes:
- Core::Helpers::Time
- Defined in:
- lib/karafka/pro/processing/filters/base.rb
Overview
Base for all the filters. All filters (including custom) need to use this API.
Due to the fact, that filters can limit data in such a way, that we need to pause or seek (throttling for example), the api is not just “remove some things from batch” but also provides ways to control the post-filtering operations that may be needed.
Direct Known Subclasses
Delayer, Expirer, InlineInsightsDelayer, Throttler, VirtualLimiter
Instance Attribute Summary collapse
-
#cursor ⇒ Karafka::Messages::Message?
readonly
The message that we want to use as a cursor one to pause or seek or nil if not applicable.
Instance Method Summary collapse
-
#action ⇒ Symbol
Filter post-execution action on consumer.
-
#applied? ⇒ Boolean
Did this filter change messages in any way.
- #apply!(messages) ⇒ Object
-
#initialize ⇒ Base
constructor
A new instance of Base.
-
#mark_as_consumed? ⇒ Boolean
Should we use the cursor value to mark as consumed.
-
#marking_method ⇒ Symbol
‘:mark_as_consumed` or `:mark_as_consumed!`.
-
#timeout ⇒ Integer
Default timeout for pausing (if applicable).
Constructor Details
#initialize ⇒ Base
Returns a new instance of Base.
31 32 33 34 |
# File 'lib/karafka/pro/processing/filters/base.rb', line 31 def initialize @applied = false @cursor = nil end |
Instance Attribute Details
#cursor ⇒ Karafka::Messages::Message? (readonly)
Returns the message that we want to use as a cursor one to pause or seek or nil if not applicable.
27 28 29 |
# File 'lib/karafka/pro/processing/filters/base.rb', line 27 def cursor @cursor end |
Instance Method Details
#action ⇒ Symbol
Returns filter post-execution action on consumer. Either ‘:skip`, `:pause` or `:seek`.
44 45 46 |
# File 'lib/karafka/pro/processing/filters/base.rb', line 44 def action :skip end |
#applied? ⇒ Boolean
Returns did this filter change messages in any way.
49 50 51 |
# File 'lib/karafka/pro/processing/filters/base.rb', line 49 def applied? @applied end |
#apply!(messages) ⇒ Object
38 39 40 |
# File 'lib/karafka/pro/processing/filters/base.rb', line 38 def apply!() raise NotImplementedError, 'Implement in a subclass' end |
#mark_as_consumed? ⇒ Boolean
Returns should we use the cursor value to mark as consumed. If any of the filters returns true, we return lowers applicable cursor value (if any).
60 61 62 |
# File 'lib/karafka/pro/processing/filters/base.rb', line 60 def mark_as_consumed? false end |
#marking_method ⇒ Symbol
Returns ‘:mark_as_consumed` or `:mark_as_consumed!`. Applicable only if marking is requested.
66 67 68 |
# File 'lib/karafka/pro/processing/filters/base.rb', line 66 def marking_method :mark_as_consumed end |
#timeout ⇒ Integer
Returns default timeout for pausing (if applicable).
54 55 56 |
# File 'lib/karafka/pro/processing/filters/base.rb', line 54 def timeout 0 end |