Module: Karafka::Pro::Processing::Strategies::Default

Overview

No features enabled. No manual offset management No long running jobs No virtual partitions Nothing. Just standard, automatic flow

Constant Summary collapse

FEATURES =

Apply strategy for a non-feature based flow

%i[].freeze

Instance Method Summary collapse

Methods included from Karafka::Processing::Strategies::Default

#commit_offsets, #commit_offsets!, #handle_idle, #handle_shutdown, #mark_as_consumed, #mark_as_consumed!

Methods included from Karafka::Processing::Strategies::Base

#handle_idle, #handle_shutdown

Instance Method Details

#handle_after_consumeObject

Standard flow without any features



70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
# File 'lib/karafka/pro/processing/strategies/default.rb', line 70

def handle_after_consume
  coordinator.on_finished do |last_group_message|
    return if revoked?

    if coordinator.success?
      coordinator.pause_tracker.reset

      # Do not mark last message if pause happened. This prevents a scenario where pause
      # is overridden upon rebalance by marking
      return if coordinator.manual_pause?

      mark_as_consumed(last_group_message)
    else
      retry_after_pause
    end
  end
end

#handle_before_consumeObject

Increment number of attempts per one “full” job. For all VP on a single topic partition this also should run once.



37
38
39
40
41
# File 'lib/karafka/pro/processing/strategies/default.rb', line 37

def handle_before_consume
  coordinator.on_started do
    coordinator.pause_tracker.increment
  end
end

#handle_before_enqueueObject

No actions needed for the standard flow here



31
32
33
# File 'lib/karafka/pro/processing/strategies/default.rb', line 31

def handle_before_enqueue
  nil
end

#handle_consumeObject

Run the user consumption code



44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
# File 'lib/karafka/pro/processing/strategies/default.rb', line 44

def handle_consume
  # We should not run the work at all on a partition that was revoked
  # This can happen primarily when an LRJ job gets to the internal worker queue and
  # this partition is revoked prior processing.
  unless revoked?
    Karafka.monitor.instrument('consumer.consume', caller: self)
    Karafka.monitor.instrument('consumer.consumed', caller: self) do
      consume
    end
  end

  # Mark job as successful
  coordinator.success!(self)
rescue StandardError => e
  # If failed, mark as failed
  coordinator.failure!(self, e)

  # Re-raise so reported in the consumer
  raise e
ensure
  # We need to decrease number of jobs that this coordinator coordinates as it has
  # finished
  coordinator.decrement
end

#handle_revokedObject

Standard



89
90
91
92
93
94
95
96
97
98
99
100
# File 'lib/karafka/pro/processing/strategies/default.rb', line 89

def handle_revoked
  coordinator.on_revoked do
    resume

    coordinator.revoke
  end

  Karafka.monitor.instrument('consumer.revoke', caller: self)
  Karafka.monitor.instrument('consumer.revoked', caller: self) do
    revoked
  end
end