Module: Karafka::Pro::Processing::Strategies::Aj::DlqMomVp

Includes:
Default, Dlq::Vp, Vp::Default
Included in:
DlqFtrLrjMomVp, DlqFtrMomVp, DlqLrjMomVp
Defined in:
lib/karafka/pro/processing/strategies/aj/dlq_mom_vp.rb

Overview

ActiveJob enabled Manual offset management enabled Virtual Partitions enabled

Constant Summary collapse

FEATURES =

Features for this strategy

%i[
  active_job
  dead_letter_queue
  manual_offset_management
  virtual_partitions
].freeze

Instance Method Summary collapse

Methods included from Vp::Default

#collapse_until!, #collapsed?, #failing?, #mark_as_consumed, #mark_as_consumed!, #mark_in_transaction, #synchronize

Methods included from Default

#handle_before_consume, #handle_before_schedule_consume, #handle_before_schedule_tick, #handle_consume, #handle_revoked, #handle_tick, #mark_as_consumed, #mark_as_consumed!, #mark_in_memory, #mark_in_transaction, #mark_with_transaction, #store_offset_metadata, #transaction

Methods included from Karafka::Processing::Strategies::Default

#commit_offsets, #commit_offsets!, #handle_before_consume, #handle_consume, #handle_eofed, #handle_idle, #handle_initialized, #handle_revoked, #handle_shutdown, #handle_wrap, #mark_as_consumed, #mark_as_consumed!

Methods included from Karafka::Processing::Strategies::Base

#handle_before_consume, #handle_consume, #handle_idle, #handle_revoked, #handle_shutdown

Methods included from Dlq::Default

#apply_dlq_flow, #build_dlq_message, #dispatch_if_needed_and_mark_as_consumed, #dispatch_in_a_transaction?, #dispatch_to_dlq, #dispatch_to_dlq?, #find_skippable_message, #mark_after_dispatch?, #mark_as_consumed, #mark_as_consumed!, #mark_dispatched_to_dlq

Instance Method Details

#handle_after_consumeObject

Flow including moving to DLQ in the collapsed mode



36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
# File 'lib/karafka/pro/processing/strategies/aj/dlq_mom_vp.rb', line 36

def handle_after_consume
  coordinator.on_finished do |last_group_message|
    if coordinator.success?
      coordinator.pause_tracker.reset

      # When this is an ActiveJob running via Pro with virtual partitions, we cannot
      # mark intermediate jobs as processed not to mess up with the ordering.
      # Only when all the jobs are processed and we did not loose the partition
      # assignment and we are not stopping (Pro ActiveJob has an early break) we can
      # commit offsets .
      # For a non virtual partitions case, the flow is regular and state is marked
      # after each successfully processed job
      return if revoked?

      mark_as_consumed(last_group_message)
    else
      apply_dlq_flow do
        # Here we are in a collapsed state, hence we can apply the same logic as
        # Aj::DlqMom
        skippable_message, = find_skippable_message
        dispatch_to_dlq(skippable_message) if dispatch_to_dlq?
        mark_dispatched_to_dlq(skippable_message)
      end
    end
  end
end