Module: Karafka::Pro::Processing::Strategies::AjDlqMom
- Includes:
- DlqMom
- Included in:
- AjDlqLrjMom
- Defined in:
- lib/karafka/pro/processing/strategies/aj_dlq_mom.rb
Overview
ActiveJob enabled DLQ enabled Manual offset management enabled
AJ has manual offset management on by default and the offset management is delegated to the AJ consumer. This means, we cannot mark as consumed always. We can only mark as consumed when we skip given job upon errors. In all the other scenarios marking as consumed needs to happen in the AJ consumer on a per job basis.
Constant Summary collapse
- FEATURES =
Features for this strategy
%i[ active_job dead_letter_queue manual_offset_management ].freeze
Instance Method Summary collapse
-
#handle_after_consume ⇒ Object
How should we post-finalize consumption.
Methods included from Dlq
#dispatch_to_dlq, #dispatch_to_dlq?, #find_skippable_message
Methods included from Default
#handle_before_consume, #handle_before_enqueue, #handle_consume, #handle_revoked
Methods included from Karafka::Processing::Strategies::Default
#handle_before_consume, #handle_before_enqueue, #handle_consume, #handle_revoked, #handle_shutdown
Methods included from Karafka::Processing::Strategies::Base
#handle_before_consume, #handle_before_enqueue, #handle_consume, #handle_revoked, #handle_shutdown
Instance Method Details
#handle_after_consume ⇒ Object
How should we post-finalize consumption.
37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 |
# File 'lib/karafka/pro/processing/strategies/aj_dlq_mom.rb', line 37 def handle_after_consume coordinator.on_finished do return if revoked? if coordinator.success? # Do NOT commit offsets, they are comitted after each job in the AJ consumer. coordinator.pause_tracker.reset elsif coordinator.pause_tracker.attempt <= topic.dead_letter_queue.max_retries retry_after_pause else coordinator.pause_tracker.reset = dispatch_to_dlq() if dispatch_to_dlq? # We can commit the offset here because we know that we skip it "forever" and # since AJ consumer commits the offset after each job, we also know that the # previous job was successful mark_as_consumed() pause(coordinator.seek_offset, nil, false) end end end |