Module: Karafka::Pro::Processing::Strategies::Aj::FtrMomVp

Includes:
FtrMom, MomVp
Defined in:
lib/karafka/pro/processing/strategies/aj/ftr_mom_vp.rb

Overview

ActiveJob enabled Filtering enabled Manual Offset management enabled Virtual partitions enabled

Constant Summary collapse

FEATURES =

Features for this strategy

%i[
  active_job
  filtering
  manual_offset_management
  virtual_partitions
].freeze

Instance Method Summary collapse

Methods included from Vp::Default

#collapse_until!, #collapsed?, #failing?, #mark_as_consumed, #mark_as_consumed!, #mark_in_transaction, #synchronize

Methods included from Default

#handle_before_consume, #handle_before_schedule_consume, #handle_before_schedule_tick, #handle_consume, #handle_revoked, #handle_tick, #mark_as_consumed, #mark_as_consumed!, #mark_in_memory, #mark_in_transaction, #mark_with_transaction, #store_offset_metadata, #transaction

Methods included from Karafka::Processing::Strategies::Default

#commit_offsets, #commit_offsets!, #handle_before_consume, #handle_consume, #handle_eofed, #handle_idle, #handle_initialized, #handle_revoked, #handle_shutdown, #handle_wrap, #mark_as_consumed, #mark_as_consumed!

Methods included from Karafka::Processing::Strategies::Base

#handle_before_consume, #handle_consume, #handle_idle, #handle_revoked, #handle_shutdown

Methods included from Ftr::Default

#handle_idle, #handle_post_filtering

Instance Method Details

#handle_after_consumeObject

AJ with VPs always has intermediate marking disabled, hence we need to do it post execution always.



38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
# File 'lib/karafka/pro/processing/strategies/aj/ftr_mom_vp.rb', line 38

def handle_after_consume
  coordinator.on_finished do |last_group_message|
    if coordinator.success?
      coordinator.pause_tracker.reset

      return if revoked?

      mark_as_consumed(last_group_message)

      handle_post_filtering
    else
      retry_after_pause
    end
  end
end