Module: Karafka::Processing::Strategies::AjDlqMom

Includes:
DlqMom
Defined in:
lib/karafka/processing/strategies/aj_dlq_mom.rb

Overview

ActiveJob strategy to cooperate with the DLQ.

While AJ is uses MOM by default because it delegates the offset management to the AJ consumer. With DLQ however there is an extra case for skipping broken jobs with offset marking due to ordered processing.

Constant Summary collapse

FEATURES =

Apply strategy when only when using AJ with MOM and DLQ

%i[
  active_job
  dead_letter_queue
  manual_offset_management
].freeze

Instance Method Summary collapse

Methods included from DlqMom

#mark_after_dispatch?

Methods included from Dlq

#dispatch_to_dlq, #find_skippable_message, #mark_after_dispatch?, #mark_as_consumed, #mark_as_consumed!, #mark_dispatched_to_dlq

Methods included from Default

#commit_offsets, #commit_offsets!, #handle_before_consume, #handle_consume, #handle_eofed, #handle_idle, #handle_initialized, #handle_revoked, #handle_shutdown, #handle_wrap, #mark_as_consumed, #mark_as_consumed!

Methods included from Base

#handle_before_consume, #handle_consume, #handle_idle, #handle_revoked, #handle_shutdown

Instance Method Details

#handle_after_consumeObject

How should we post-finalize consumption.



22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
# File 'lib/karafka/processing/strategies/aj_dlq_mom.rb', line 22

def handle_after_consume
  return if revoked?

  if coordinator.success?
    # Do NOT commit offsets, they are comitted after each job in the AJ consumer.
    coordinator.pause_tracker.reset
  elsif coordinator.pause_tracker.attempt <= topic.dead_letter_queue.max_retries
    retry_after_pause
  else
    coordinator.pause_tracker.reset
    skippable_message, = find_skippable_message
    dispatch_to_dlq(skippable_message)
    # We can commit the offset here because we know that we skip it "forever" and
    # since AJ consumer commits the offset after each job, we also know that the
    # previous job was successful
    mark_dispatched_to_dlq(skippable_message)
    pause(coordinator.seek_offset, nil, false)
  end
end