Module: Karafka::Processing::Strategies::Dlq
Overview
When using dead letter queue, processing won’t stop after defined number of retries upon encountering non-critical errors but the messages that error will be moved to a separate topic with their payload and metadata, so they can be handled differently.
Constant Summary collapse
- FEATURES =
Apply strategy when only dead letter queue is turned on
%i[ dead_letter_queue ].freeze
Instance Method Summary collapse
-
#dispatch_to_dlq(skippable_message) ⇒ Object
Moves the broken message into a separate queue defined via the settings.
-
#find_skippable_message ⇒ Object
Finds the message we want to skip.
-
#handle_after_consume ⇒ Object
When manual offset management is on, we do not mark anything as consumed automatically and we rely on the user to figure things out.
Methods included from Default
#handle_before_consume, #handle_before_enqueue, #handle_consume, #handle_revoked, #handle_shutdown
Methods included from Base
#handle_before_consume, #handle_before_enqueue, #handle_consume, #handle_revoked, #handle_shutdown
Instance Method Details
#dispatch_to_dlq(skippable_message) ⇒ Object
Moves the broken message into a separate queue defined via the settings
63 64 65 66 67 68 69 70 71 72 73 74 75 |
# File 'lib/karafka/processing/strategies/dlq.rb', line 63 def dispatch_to_dlq() producer.produce_async( topic: topic.dead_letter_queue.topic, payload: .raw_payload ) # Notify about dispatch on the events bus Karafka.monitor.instrument( 'dead_letter_queue.dispatched', caller: self, message: ) end |
#find_skippable_message ⇒ Object
Finds the message we want to skip
54 55 56 57 |
# File 'lib/karafka/processing/strategies/dlq.rb', line 54 def = .find { || .offset == coordinator.seek_offset } || raise(Errors::SkipMessageNotFoundError, topic.name) end |
#handle_after_consume ⇒ Object
When manual offset management is on, we do not mark anything as consumed automatically and we rely on the user to figure things out
19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 |
# File 'lib/karafka/processing/strategies/dlq.rb', line 19 def handle_after_consume return if revoked? if coordinator.success? coordinator.pause_tracker.reset return if coordinator.manual_pause? mark_as_consumed(.last) elsif coordinator.pause_tracker.attempt <= topic.dead_letter_queue.max_retries retry_after_pause # If we've reached number of retries that we could, we need to skip the first message # that was not marked as consumed, pause and continue, while also moving this message # to the dead topic else # We reset the pause to indicate we will now consider it as "ok". coordinator.pause_tracker.reset = # Send skippable message to the dql topic dispatch_to_dlq() # We mark the broken message as consumed and move on mark_as_consumed() return if revoked? # We pause to backoff once just in case. pause(coordinator.seek_offset, nil, false) end end |