Module: Karafka::Pro::Processing::Strategies::Ftr::Default
- Includes:
- Default
- Included in:
- Dlq::Ftr, Dlq::FtrLrjMom, Dlq::FtrMom, Vp, Lrj::Ftr, Mom::Ftr
- Defined in:
- lib/karafka/pro/processing/strategies/ftr/default.rb
Overview
Only filtering enabled
Constant Summary collapse
- FEATURES =
Just filtering enabled
%i[ filtering ].freeze
Instance Method Summary collapse
-
#handle_after_consume ⇒ Object
Standard flow without any features.
-
#handle_idle ⇒ Object
Empty run when running on idle means we need to filter.
-
#handle_post_filtering ⇒ Boolean
Throttles by pausing for an expected time period if throttling is needed or seeks in case the throttle expired.
Methods included from Default
#handle_before_consume, #handle_before_schedule_consume, #handle_before_schedule_tick, #handle_consume, #handle_revoked, #handle_tick, #mark_as_consumed, #mark_as_consumed!, #mark_in_memory, #mark_in_transaction, #mark_with_transaction, #store_offset_metadata, #transaction
Methods included from Karafka::Processing::Strategies::Default
#commit_offsets, #commit_offsets!, #handle_before_consume, #handle_consume, #handle_eofed, #handle_initialized, #handle_revoked, #handle_shutdown, #handle_wrap, #mark_as_consumed, #mark_as_consumed!
Methods included from Karafka::Processing::Strategies::Base
#handle_before_consume, #handle_consume, #handle_revoked, #handle_shutdown
Instance Method Details
#handle_after_consume ⇒ Object
Standard flow without any features
35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 |
# File 'lib/karafka/pro/processing/strategies/ftr/default.rb', line 35 def handle_after_consume coordinator.on_finished do || return if revoked? if coordinator.success? coordinator.pause_tracker.reset # Do not mark last message if pause happened. This prevents a scenario where # pause is overridden upon rebalance by marking return if coordinator.manual_pause? mark_as_consumed() handle_post_filtering else retry_after_pause end end end |
#handle_idle ⇒ Object
Empty run when running on idle means we need to filter
30 31 32 |
# File 'lib/karafka/pro/processing/strategies/ftr/default.rb', line 30 def handle_idle handle_post_filtering end |
#handle_post_filtering ⇒ Boolean
Throttles by pausing for an expected time period if throttling is needed or seeks in case the throttle expired. Throttling may expire because we throttle before processing starts and we need to compensate for processing time. It may turn out that we don’t have to pause but we need to move the offset because we skipped some messages due to throttling filtering.
61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 |
# File 'lib/karafka/pro/processing/strategies/ftr/default.rb', line 61 def handle_post_filtering filter = coordinator.filter # We pick the timeout before the action because every action takes time. This time # may then mean we end up having throttle time equal to zero when pause is needed # and this should not happen throttle_timeout = filter.timeout # If user requested marking when applying filter, we mark. We may be in the user # flow but even then this is not a problem. Older offsets will be ignored since # we do not force the offset update (expected) and newer are on the user to control. # This can be primarily used when filtering large quantities of data to mark on the # idle runs, so lag reporting is aware that those messages were not consumed but also # are no longer relevant if filter.mark_as_consumed? send( filter.marking_method, filter.cursor ) end case filter.action when :skip nil when :seek # User direct actions take priority over automatic operations # If we've already seeked we can just resume operations, nothing extra needed return resume if coordinator.manual_seek? = filter.cursor monitor.instrument( 'filtering.seek', caller: self, message: ) do seek(.offset, false) end resume when :pause # User direct actions take priority over automatic operations return nil if coordinator.manual_pause? = filter.cursor monitor.instrument( 'filtering.throttled', caller: self, message: , timeout: throttle_timeout ) do pause(.offset, throttle_timeout, false) end else raise Karafka::Errors::UnsupportedCaseError filter.action end end |