Class: Karafka::Pro::Processing::Filters::InlineInsightsDelayer
- Defined in:
- lib/karafka/pro/processing/filters/inline_insights_delayer.rb
Overview
Delayer that checks if we have appropriate insights available. If not, pauses for 5 seconds so the insights can be loaded from the broker.
In case it would take more than five seconds to load insights, it will just pause again
This filter ensures, that we always have inline insights that a consumer can use
It is relevant in most cases only during the process start, when first poll may not yield statistics yet but will give some data.
Instance Attribute Summary
Attributes inherited from Base
Instance Method Summary collapse
-
#action ⇒ Object
Pause when we had to back-off or skip if delay is not needed.
-
#apply!(messages) ⇒ Object
Pauses if inline insights would not be available.
-
#initialize(topic, partition) ⇒ InlineInsightsDelayer
constructor
A new instance of InlineInsightsDelayer.
-
#timeout ⇒ Integer?
Ms timeout in case of pause or nil if not delaying.
Methods inherited from Base
#applied?, #mark_as_consumed?, #marking_cursor, #marking_method
Constructor Details
#initialize(topic, partition) ⇒ InlineInsightsDelayer
Returns a new instance of InlineInsightsDelayer.
44 45 46 47 48 |
# File 'lib/karafka/pro/processing/filters/inline_insights_delayer.rb', line 44 def initialize(topic, partition) super() @topic = topic @partition = partition end |
Instance Method Details
#action ⇒ Object
Pause when we had to back-off or skip if delay is not needed
80 81 82 |
# File 'lib/karafka/pro/processing/filters/inline_insights_delayer.rb', line 80 def action applied? ? :pause : :skip end |
#apply!(messages) ⇒ Object
Pauses if inline insights would not be available. Does nothing otherwise
53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 |
# File 'lib/karafka/pro/processing/filters/inline_insights_delayer.rb', line 53 def apply!() @applied = false @cursor = .first # Nothing to do if there were no messages # This can happen when we chain filters return unless @cursor insights = Karafka::Processing::InlineInsights::Tracker.find( @topic, @partition ) # If insights are available, also nothing to do here and we can just process return unless insights.empty? .clear @applied = true end |
#timeout ⇒ Integer?
Returns ms timeout in case of pause or nil if not delaying.
75 76 77 |
# File 'lib/karafka/pro/processing/filters/inline_insights_delayer.rb', line 75 def timeout (@cursor && applied?) ? PAUSE_TIMEOUT : nil end |