Module: Karafka::Processing::Strategies::Default
- Includes:
- Base
- Included in:
- Karafka::Pro::Processing::Strategies::Default, Dlq, Mom
- Defined in:
- lib/karafka/processing/strategies/default.rb
Overview
No features enabled. No manual offset management No long running jobs Nothing. Just standard, automatic flow
Constant Summary collapse
- FEATURES =
Apply strategy for a non-feature based flow
%i[].freeze
Instance Method Summary collapse
-
#handle_after_consume ⇒ Object
Standard flow marks work as consumed and moves on if everything went ok.
-
#handle_before_consume ⇒ Object
Increment number of attempts.
-
#handle_before_enqueue ⇒ Object
No actions needed for the standard flow here.
-
#handle_consume ⇒ Object
Run the user consumption code.
-
#handle_revoked ⇒ Object
We need to always un-pause the processing in case we have lost a given partition.
-
#handle_shutdown ⇒ Object
Runs the shutdown code.
Instance Method Details
#handle_after_consume ⇒ Object
Standard flow marks work as consumed and moves on if everything went ok. If there was a processing error, we will pause and continue from the next message (next that is +1 from the last one that was successfully marked as consumed)
49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 |
# File 'lib/karafka/processing/strategies/default.rb', line 49 def handle_after_consume return if revoked? if coordinator.success? coordinator.pause_tracker.reset # We should not move the offset automatically when the partition was paused # If we would not do this upon a revocation during the pause time, a different process # would pick not from the place where we paused but from the offset that would be # automatically committed here return if coordinator.manual_pause? mark_as_consumed(.last) else retry_after_pause end end |
#handle_before_consume ⇒ Object
Increment number of attempts
22 23 24 |
# File 'lib/karafka/processing/strategies/default.rb', line 22 def handle_before_consume coordinator.pause_tracker.increment end |
#handle_before_enqueue ⇒ Object
No actions needed for the standard flow here
17 18 19 |
# File 'lib/karafka/processing/strategies/default.rb', line 17 def handle_before_enqueue nil end |
#handle_consume ⇒ Object
Run the user consumption code
27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 |
# File 'lib/karafka/processing/strategies/default.rb', line 27 def handle_consume Karafka.monitor.instrument('consumer.consume', caller: self) Karafka.monitor.instrument('consumer.consumed', caller: self) do consume end # Mark job as successful coordinator.consumption(self).success! rescue StandardError => e # If failed, mark as failed coordinator.consumption(self).failure!(e) # Re-raise so reported in the consumer raise e ensure # We need to decrease number of jobs that this coordinator coordinates as it has finished coordinator.decrement end |
#handle_revoked ⇒ Object
We need to always un-pause the processing in case we have lost a given partition. Otherwise the underlying librdkafka would not know we may want to continue processing and the pause could in theory last forever
70 71 72 73 74 75 76 77 78 79 |
# File 'lib/karafka/processing/strategies/default.rb', line 70 def handle_revoked resume coordinator.revoke Karafka.monitor.instrument('consumer.revoke', caller: self) Karafka.monitor.instrument('consumer.revoked', caller: self) do revoked end end |