Module: Karafka::Pro::Processing::Strategies::Vp::Default

Includes:
Default
Included in:
Aj::DlqMomVp, Aj::FtrLrjMomVp, Aj::LrjMomVp, Aj::MomVp, Dlq::FtrLrjMomVp, Dlq::FtrLrjVp, Dlq::FtrVp, Dlq::LrjVp, Dlq::Vp, Ftr::Vp, Lrj::FtrVp, Lrj::Vp, Mom::Vp
Defined in:
lib/karafka/pro/processing/strategies/vp/default.rb

Overview

Just Virtual Partitions enabled

Constant Summary collapse

FEATURES =

Features for this strategy

%i[
  virtual_partitions
].freeze

Instance Method Summary collapse

Methods included from Default

#handle_after_consume, #handle_before_consume, #handle_consume, #handle_revoked

Methods included from Karafka::Processing::Strategies::Default

#commit_offsets, #commit_offsets!, #handle_after_consume, #handle_before_consume, #handle_consume, #handle_idle, #handle_revoked, #handle_shutdown

Methods included from Karafka::Processing::Strategies::Base

#handle_after_consume, #handle_before_consume, #handle_consume, #handle_idle, #handle_revoked, #handle_shutdown

Instance Method Details

#collapsed?Boolean

Returns is the virtual processing collapsed in the context of given consumer.

Returns:

  • (Boolean)

    is the virtual processing collapsed in the context of given consumer.



71
72
73
# File 'lib/karafka/pro/processing/strategies/vp/default.rb', line 71

def collapsed?
  coordinator.collapsed?
end

#failing?Boolean

Note:

We’ve named it ‘#failing?` instead of `#failure?` because it aims to be used from within virtual partitions where we want to have notion of collective failing not just “local” to our processing. We “are” failing with other virtual partitions raising an error, but locally we are still processing.

Returns true if any of virtual partition we’re operating in the entangled mode has already failed and we know we are failing collectively. Useful for early stop to minimize number of things processed twice.

Returns:

  • (Boolean)

    true if any of virtual partition we’re operating in the entangled mode has already failed and we know we are failing collectively. Useful for early stop to minimize number of things processed twice.



83
84
85
# File 'lib/karafka/pro/processing/strategies/vp/default.rb', line 83

def failing?
  coordinator.failure?
end

#mark_as_consumed(message) ⇒ Object

Note:

This virtual offset management uses a regular default marking API underneath. We do not alter the “real” marking API, as VPs are just one of many cases we want to support and we do not want to impact them with collective offsets management

Parameters:



35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
# File 'lib/karafka/pro/processing/strategies/vp/default.rb', line 35

def mark_as_consumed(message)
  return super if collapsed?

  manager = coordinator.virtual_offset_manager

  coordinator.synchronize do
    manager.mark(message)
    # If this is last marking on a finished flow, we can use the original
    # last message and in order to do so, we need to mark all previous messages as
    # consumed as otherwise the computed offset could be different
    # We mark until our offset just in case of a DLQ flow or similar, where we do not
    # want to mark all but until the expected location
    manager.mark_until(message) if coordinator.finished?

    return revoked? unless manager.markable?
  end

  manager.markable? ? super(manager.markable) : revoked?
end

#mark_as_consumed!(message) ⇒ Object

Parameters:



56
57
58
59
60
61
62
63
64
65
66
67
# File 'lib/karafka/pro/processing/strategies/vp/default.rb', line 56

def mark_as_consumed!(message)
  return super if collapsed?

  manager = coordinator.virtual_offset_manager

  coordinator.synchronize do
    manager.mark(message)
    manager.mark_until(message) if coordinator.finished?
  end

  manager.markable? ? super(manager.markable) : revoked?
end