Module: Karafka::Pro::Processing::Strategies::Dlq
- Includes:
- Default
- Defined in:
- lib/karafka/pro/processing/strategies/dlq.rb
Overview
Only dead letter queue enabled
Constant Summary collapse
- FEATURES =
Features for this strategy
%i[ dead_letter_queue ].freeze
Instance Method Summary collapse
-
#dispatch_to_dlq(skippable_message) ⇒ Object
Moves the broken message into a separate queue defined via the settings.
-
#dispatch_to_dlq? ⇒ Boolean
Should we dispatch the message to DLQ or not.
-
#find_skippable_message ⇒ Karafka::Messages::Message
Finds the message may want to skip (all, starting from first).
-
#handle_after_consume ⇒ Object
When we encounter non-recoverable message, we skip it and go on with our lives.
Methods included from Default
#handle_before_consume, #handle_before_enqueue, #handle_consume, #handle_revoked
Methods included from Karafka::Processing::Strategies::Default
#handle_before_consume, #handle_before_enqueue, #handle_consume, #handle_revoked, #handle_shutdown
Methods included from Karafka::Processing::Strategies::Base
#handle_before_consume, #handle_before_enqueue, #handle_consume, #handle_revoked, #handle_shutdown
Instance Method Details
#dispatch_to_dlq(skippable_message) ⇒ Object
Moves the broken message into a separate queue defined via the settings
66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 |
# File 'lib/karafka/pro/processing/strategies/dlq.rb', line 66 def dispatch_to_dlq() producer.produce_async( topic: topic.dead_letter_queue.topic, payload: .raw_payload, key: .partition.to_s, headers: .headers.merge( 'original_topic' => topic.name, 'original_partition' => .partition.to_s, 'original_offset' => .offset.to_s ) ) # Notify about dispatch on the events bus Karafka.monitor.instrument( 'dead_letter_queue.dispatched', caller: self, message: ) end |
#dispatch_to_dlq? ⇒ Boolean
Returns should we dispatch the message to DLQ or not. When the dispatch topic is set to false, we will skip the dispatch, effectively ignoring the broken message without taking any action.
89 90 91 |
# File 'lib/karafka/pro/processing/strategies/dlq.rb', line 89 def dispatch_to_dlq? topic.dead_letter_queue.topic end |
#find_skippable_message ⇒ Karafka::Messages::Message
Finds the message may want to skip (all, starting from first)
57 58 59 60 |
# File 'lib/karafka/pro/processing/strategies/dlq.rb', line 57 def = .find { |msg| msg.offset == coordinator.seek_offset } || raise(Errors::SkipMessageNotFoundError, topic.name) end |
#handle_after_consume ⇒ Object
When we encounter non-recoverable message, we skip it and go on with our lives
28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 |
# File 'lib/karafka/pro/processing/strategies/dlq.rb', line 28 def handle_after_consume coordinator.on_finished do return if revoked? if coordinator.success? coordinator.pause_tracker.reset return if coordinator.manual_pause? mark_as_consumed(.last) elsif coordinator.pause_tracker.attempt <= topic.dead_letter_queue.max_retries retry_after_pause # If we've reached number of retries that we could, we need to skip the first message # that was not marked as consumed, pause and continue, while also moving this message # to the dead topic else # We reset the pause to indicate we will now consider it as "ok". coordinator.pause_tracker.reset = dispatch_to_dlq() if dispatch_to_dlq? mark_as_consumed() pause(coordinator.seek_offset, nil, false) end end end |