Class: Karafka::Pro::Processing::Filters::InlineInsightsDelayer
- Defined in:
- lib/karafka/pro/processing/filters/inline_insights_delayer.rb
Overview
Delayer that checks if we have appropriate insights available. If not, pauses for 5 seconds so the insights can be loaded from the broker.
In case it would take more than five seconds to load insights, it will just pause again
This filter ensures, that we always have inline insights that a consumer can use
It is relevant in most cases only during the process start, when first poll may not yield statistics yet but will give some data.
Instance Attribute Summary
Attributes inherited from Base
Instance Method Summary collapse
-
#action ⇒ Object
Pause when we had to back-off or skip if delay is not needed.
-
#apply!(messages) ⇒ Object
Pauses if inline insights would not be available.
-
#initialize(topic, partition) ⇒ InlineInsightsDelayer
constructor
A new instance of InlineInsightsDelayer.
-
#timeout ⇒ Integer
Ms timeout in case of pause.
Methods inherited from Base
#applied?, #mark_as_consumed?, #marking_method
Constructor Details
#initialize(topic, partition) ⇒ InlineInsightsDelayer
Returns a new instance of InlineInsightsDelayer.
35 36 37 38 39 |
# File 'lib/karafka/pro/processing/filters/inline_insights_delayer.rb', line 35 def initialize(topic, partition) super() @topic = topic @partition = partition end |
Instance Method Details
#action ⇒ Object
Pause when we had to back-off or skip if delay is not needed
71 72 73 |
# File 'lib/karafka/pro/processing/filters/inline_insights_delayer.rb', line 71 def action applied? ? :pause : :skip end |
#apply!(messages) ⇒ Object
Pauses if inline insights would not be available. Does nothing otherwise
44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 |
# File 'lib/karafka/pro/processing/filters/inline_insights_delayer.rb', line 44 def apply!() @applied = false @cursor = .first # Nothing to do if there were no messages # This can happen when we chain filters return unless @cursor insights = ::Karafka::Processing::InlineInsights::Tracker.find( @topic, @partition ) # If insights are available, also nothing to do here and we can just process return unless insights.empty? .clear @applied = true end |
#timeout ⇒ Integer
Returns ms timeout in case of pause.
66 67 68 |
# File 'lib/karafka/pro/processing/filters/inline_insights_delayer.rb', line 66 def timeout @cursor && applied? ? PAUSE_TIMEOUT : 0 end |