Module: Karafka::Pro::Processing::Strategies::Default
- Includes:
- Base, Karafka::Processing::Strategies::Default
- Included in:
- Aj::DlqLrjMom, Aj::DlqMomVp, Aj::LrjMomVp, Aj::MomVp, Karafka::Pro::Processing::Strategies::Dlq::Default, Ftr::Default, Lrj::Default, Lrj::Mom, Mom::Default, Vp::Default
- Defined in:
- lib/karafka/pro/processing/strategies/default.rb
Overview
No features enabled. No manual offset management No long running jobs No virtual partitions Nothing. Just standard, automatic flow
Constant Summary collapse
- FEATURES =
Apply strategy for a non-feature based flow
%i[].freeze
Instance Method Summary collapse
-
#handle_after_consume ⇒ Object
Standard flow without any features.
-
#handle_before_consume ⇒ Object
Increment number of attempts per one “full” job.
-
#handle_before_enqueue ⇒ Object
No actions needed for the standard flow here.
-
#handle_consume ⇒ Object
Run the user consumption code.
-
#handle_revoked ⇒ Object
Standard.
Methods included from Karafka::Processing::Strategies::Default
#commit_offsets, #commit_offsets!, #handle_idle, #handle_shutdown, #mark_as_consumed, #mark_as_consumed!
Methods included from Karafka::Processing::Strategies::Base
#handle_idle, #handle_shutdown
Instance Method Details
#handle_after_consume ⇒ Object
Standard flow without any features
70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 |
# File 'lib/karafka/pro/processing/strategies/default.rb', line 70 def handle_after_consume coordinator.on_finished do || return if revoked? if coordinator.success? coordinator.pause_tracker.reset # Do not mark last message if pause happened. This prevents a scenario where pause # is overridden upon rebalance by marking return if coordinator.manual_pause? mark_as_consumed() else retry_after_pause end end end |
#handle_before_consume ⇒ Object
Increment number of attempts per one “full” job. For all VP on a single topic partition this also should run once.
37 38 39 40 41 |
# File 'lib/karafka/pro/processing/strategies/default.rb', line 37 def handle_before_consume coordinator.on_started do coordinator.pause_tracker.increment end end |
#handle_before_enqueue ⇒ Object
No actions needed for the standard flow here
31 32 33 |
# File 'lib/karafka/pro/processing/strategies/default.rb', line 31 def handle_before_enqueue nil end |
#handle_consume ⇒ Object
Run the user consumption code
44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 |
# File 'lib/karafka/pro/processing/strategies/default.rb', line 44 def handle_consume # We should not run the work at all on a partition that was revoked # This can happen primarily when an LRJ job gets to the internal worker queue and # this partition is revoked prior processing. unless revoked? Karafka.monitor.instrument('consumer.consume', caller: self) Karafka.monitor.instrument('consumer.consumed', caller: self) do consume end end # Mark job as successful coordinator.success!(self) rescue StandardError => e # If failed, mark as failed coordinator.failure!(self, e) # Re-raise so reported in the consumer raise e ensure # We need to decrease number of jobs that this coordinator coordinates as it has # finished coordinator.decrement end |
#handle_revoked ⇒ Object
Standard
89 90 91 92 93 94 95 96 97 98 99 100 |
# File 'lib/karafka/pro/processing/strategies/default.rb', line 89 def handle_revoked coordinator.on_revoked do resume coordinator.revoke end Karafka.monitor.instrument('consumer.revoke', caller: self) Karafka.monitor.instrument('consumer.revoked', caller: self) do revoked end end |