Module: Karafka::Pro::Processing::Strategies::Ftr::Default
- Includes:
- Default
- Included in:
- Dlq::Ftr, Dlq::FtrLrjMom, Dlq::FtrMom, Vp, Lrj::Ftr, Mom::Ftr
- Defined in:
- lib/karafka/pro/processing/strategies/ftr/default.rb
Overview
Only filtering enabled
Constant Summary collapse
- FEATURES =
Just filtering enabled
%i[ filtering ].freeze
Instance Method Summary collapse
-
#handle_after_consume ⇒ Object
Standard flow without any features.
-
#handle_idle ⇒ Object
Empty run when running on idle means we need to filter.
-
#handle_post_filtering ⇒ Boolean
Throttles by pausing for an expected time period if throttling is needed or seeks in case the throttle expired.
Methods included from Default
#handle_before_consume, #handle_before_schedule_consume, #handle_before_schedule_tick, #handle_consume, #handle_revoked, #handle_tick, #mark_as_consumed, #mark_as_consumed!, #mark_in_memory, #mark_in_transaction, #mark_with_transaction, #store_offset_metadata, #transaction
Methods included from Karafka::Processing::Strategies::Default
#commit_offsets, #commit_offsets!, #handle_before_consume, #handle_consume, #handle_eofed, #handle_initialized, #handle_revoked, #handle_shutdown, #handle_wrap, #mark_as_consumed, #mark_as_consumed!
Methods included from Karafka::Processing::Strategies::Base
#handle_before_consume, #handle_consume, #handle_revoked, #handle_shutdown
Instance Method Details
#handle_after_consume ⇒ Object
Standard flow without any features
44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 |
# File 'lib/karafka/pro/processing/strategies/ftr/default.rb', line 44 def handle_after_consume coordinator.on_finished do || return if revoked? if coordinator.success? coordinator.pause_tracker.reset # Do not mark last message if pause happened. This prevents a scenario where # pause is overridden upon rebalance by marking return if coordinator.manual_pause? mark_as_consumed() handle_post_filtering else retry_after_pause end end end |
#handle_idle ⇒ Object
Empty run when running on idle means we need to filter
39 40 41 |
# File 'lib/karafka/pro/processing/strategies/ftr/default.rb', line 39 def handle_idle handle_post_filtering end |
#handle_post_filtering ⇒ Boolean
Throttles by pausing for an expected time period if throttling is needed or seeks in case the throttle expired. Throttling may expire because we throttle before processing starts and we need to compensate for processing time. It may turn out that we don’t have to pause but we need to move the offset because we skipped some messages due to throttling filtering.
70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 |
# File 'lib/karafka/pro/processing/strategies/ftr/default.rb', line 70 def handle_post_filtering filter = coordinator.filter # We pick the timeout before the action because every action takes time. This time # may then mean we end up having throttle time equal to zero when pause is needed # and this should not happen throttle_timeout = filter.timeout # If user requested marking when applying filter, we mark. We may be in the user # flow but even then this is not a problem. Older offsets will be ignored since # we do not force the offset update (expected) and newer are on the user to control. # This can be primarily used when filtering large quantities of data to mark on the # idle runs, so lag reporting is aware that those messages were not consumed but also # are no longer relevant if filter.mark_as_consumed? send( filter.marking_method, filter.marking_cursor ) end case filter.action when :skip nil when :seek # User direct actions take priority over automatic operations # If we've already seeked we can just resume operations, nothing extra needed return resume if coordinator.manual_seek? = filter.cursor monitor.instrument( "filtering.seek", caller: self, message: ) do seek(.offset, false) end resume when :pause # User direct actions take priority over automatic operations return nil if coordinator.manual_pause? = filter.cursor monitor.instrument( "filtering.throttled", caller: self, message: , timeout: throttle_timeout ) do pause(.offset, throttle_timeout, false) end else raise Karafka::Errors::UnsupportedCaseError filter.action end end |