Module: Karafka::Pro::Processing::Strategies::Dlq::Default
- Defined in:
- lib/karafka/pro/processing/strategies/dlq/default.rb
Overview
Only dead letter queue enabled
Constant Summary collapse
- FEATURES =
Features for this strategy
%i[ dead_letter_queue ].freeze
Instance Method Summary collapse
-
#build_dlq_message(skippable_message) ⇒ Hash
Dispatch DLQ message.
-
#dispatch_to_dlq(skippable_message) ⇒ Object
Moves the broken message into a separate queue defined via the settings.
-
#dispatch_to_dlq? ⇒ Boolean
Should we dispatch the message to DLQ or not.
-
#find_skippable_message ⇒ Array<Karafka::Messages::Message, Boolean>
Finds the message may want to skip (all, starting from first).
-
#handle_after_consume ⇒ Object
When we encounter non-recoverable message, we skip it and go on with our lives.
Methods included from Karafka::Pro::Processing::Strategies::Default
#handle_before_consume, #handle_before_enqueue, #handle_consume, #handle_revoked
Methods included from Karafka::Processing::Strategies::Default
#commit_offsets, #commit_offsets!, #handle_before_consume, #handle_before_enqueue, #handle_consume, #handle_idle, #handle_revoked, #handle_shutdown, #mark_as_consumed, #mark_as_consumed!
Methods included from Karafka::Processing::Strategies::Base
#handle_before_consume, #handle_before_enqueue, #handle_consume, #handle_idle, #handle_revoked, #handle_shutdown
Instance Method Details
#build_dlq_message(skippable_message) ⇒ Hash
Returns dispatch DLQ message.
99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 |
# File 'lib/karafka/pro/processing/strategies/dlq/default.rb', line 99 def () original_partition = .partition.to_s = { topic: topic.dead_letter_queue.topic, key: original_partition, payload: .raw_payload, headers: .headers.merge( 'original_topic' => topic.name, 'original_partition' => original_partition, 'original_offset' => .offset.to_s, 'original_consumer_group' => topic.consumer_group.id ) } # Optional method user can define in consumer to enhance the dlq message hash with # some extra details if needed or to replace payload, etc if respond_to?(:enhance_dlq_message, true) ( , ) end end |
#dispatch_to_dlq(skippable_message) ⇒ Object
Moves the broken message into a separate queue defined via the settings
76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 |
# File 'lib/karafka/pro/processing/strategies/dlq/default.rb', line 76 def dispatch_to_dlq() # DLQ should never try to dispatch a message that was cleaned. It message was # cleaned, we will not have all the needed data. If you see this error, it means # that your processing flow is not as expected and you have cleaned message that # should not be cleaned as it should go to the DLQ raise(Cleaner::Errors::MessageCleanedError) if .cleaned? producer.produce_async( ( ) ) # Notify about dispatch on the events bus Karafka.monitor.instrument( 'dead_letter_queue.dispatched', caller: self, message: ) end |
#dispatch_to_dlq? ⇒ Boolean
Returns should we dispatch the message to DLQ or not. When the dispatch topic is set to false, we will skip the dispatch, effectively ignoring the broken message without taking any action.
129 130 131 |
# File 'lib/karafka/pro/processing/strategies/dlq/default.rb', line 129 def dispatch_to_dlq? topic.dead_letter_queue.topic end |
#find_skippable_message ⇒ Array<Karafka::Messages::Message, Boolean>
Finds the message may want to skip (all, starting from first)
60 61 62 63 64 65 66 67 68 69 70 |
# File 'lib/karafka/pro/processing/strategies/dlq/default.rb', line 60 def = .find do |msg| coordinator.marked? && msg.offset == coordinator.seek_offset end # If we don't have the message matching the last comitted offset, it means that # user operates with manual offsets and we're beyond the batch in which things # broke for the first time. Then we skip the first (as no markings) and we # move on one by one. ? [, true] : [.first, false] end |
#handle_after_consume ⇒ Object
When we encounter non-recoverable message, we skip it and go on with our lives
30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 |
# File 'lib/karafka/pro/processing/strategies/dlq/default.rb', line 30 def handle_after_consume coordinator.on_finished do || return if revoked? if coordinator.success? coordinator.pause_tracker.reset return if coordinator.manual_pause? mark_as_consumed() elsif coordinator.pause_tracker.attempt <= topic.dead_letter_queue.max_retries retry_after_pause # If we've reached number of retries that we could, we need to skip the first # message that was not marked as consumed, pause and continue, while also moving # this message to the dead topic else # We reset the pause to indicate we will now consider it as "ok". coordinator.pause_tracker.reset , = dispatch_to_dlq() if dispatch_to_dlq? mark_as_consumed() pause(coordinator.seek_offset, nil, false) end end end |