Module: Karafka::Pro::Processing::Strategies::Dlq

Includes:
Default
Included in:
DlqLrj, DlqMom
Defined in:
lib/karafka/pro/processing/strategies/dlq.rb

Overview

Only dead letter queue enabled

Constant Summary collapse

FEATURES =

Features for this strategy

%i[
  dead_letter_queue
].freeze

Instance Method Summary collapse

Methods included from Default

#handle_before_consume, #handle_before_enqueue, #handle_consume, #handle_revoked

Methods included from Karafka::Processing::Strategies::Default

#handle_before_consume, #handle_before_enqueue, #handle_consume, #handle_revoked, #handle_shutdown

Methods included from Karafka::Processing::Strategies::Base

#handle_before_consume, #handle_before_enqueue, #handle_consume, #handle_revoked, #handle_shutdown

Instance Method Details

#dispatch_to_dlq(skippable_message) ⇒ Object

Moves the broken message into a separate queue defined via the settings

Parameters:



66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
# File 'lib/karafka/pro/processing/strategies/dlq.rb', line 66

def dispatch_to_dlq(skippable_message)
  producer.produce_async(
    topic: topic.dead_letter_queue.topic,
    payload: skippable_message.raw_payload,
    key: skippable_message.partition.to_s,
    headers: skippable_message.headers.merge(
      'original_topic' => topic.name,
      'original_partition' => skippable_message.partition.to_s,
      'original_offset' => skippable_message.offset.to_s
    )
  )

  # Notify about dispatch on the events bus
  Karafka.monitor.instrument(
    'dead_letter_queue.dispatched',
    caller: self,
    message: skippable_message
  )
end

#dispatch_to_dlq?Boolean

Returns should we dispatch the message to DLQ or not. When the dispatch topic is set to false, we will skip the dispatch, effectively ignoring the broken message without taking any action.

Returns:

  • (Boolean)

    should we dispatch the message to DLQ or not. When the dispatch topic is set to false, we will skip the dispatch, effectively ignoring the broken message without taking any action.



89
90
91
# File 'lib/karafka/pro/processing/strategies/dlq.rb', line 89

def dispatch_to_dlq?
  topic.dead_letter_queue.topic
end

#find_skippable_messageKarafka::Messages::Message

Finds the message may want to skip (all, starting from first)

Returns:



57
58
59
60
# File 'lib/karafka/pro/processing/strategies/dlq.rb', line 57

def find_skippable_message
  skippable_message = messages.find { |msg| msg.offset == coordinator.seek_offset }
  skippable_message || raise(Errors::SkipMessageNotFoundError, topic.name)
end

#handle_after_consumeObject

When we encounter non-recoverable message, we skip it and go on with our lives



28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
# File 'lib/karafka/pro/processing/strategies/dlq.rb', line 28

def handle_after_consume
  coordinator.on_finished do
    return if revoked?

    if coordinator.success?
      coordinator.pause_tracker.reset

      return if coordinator.manual_pause?

      mark_as_consumed(messages.last)
    elsif coordinator.pause_tracker.attempt <= topic.dead_letter_queue.max_retries
      retry_after_pause
    # If we've reached number of retries that we could, we need to skip the first message
    # that was not marked as consumed, pause and continue, while also moving this message
    # to the dead topic
    else
      # We reset the pause to indicate we will now consider it as "ok".
      coordinator.pause_tracker.reset
      skippable_message = find_skippable_message
      dispatch_to_dlq(skippable_message) if dispatch_to_dlq?
      mark_as_consumed(skippable_message)
      pause(coordinator.seek_offset, nil, false)
    end
  end
end