Module: Karafka::Processing::Strategies::Default

Includes:
Base
Included in:
Karafka::Pro::Processing::Strategies::Default, Dlq, Mom
Defined in:
lib/karafka/processing/strategies/default.rb

Overview

No features enabled. No manual offset management No long running jobs Nothing. Just standard, automatic flow

Constant Summary collapse

FEATURES =

Apply strategy for a non-feature based flow

%i[].freeze

Instance Method Summary collapse

Instance Method Details

#handle_after_consumeObject

Standard flow marks work as consumed and moves on if everything went ok. If there was a processing error, we will pause and continue from the next message (next that is +1 from the last one that was successfully marked as consumed)



49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
# File 'lib/karafka/processing/strategies/default.rb', line 49

def handle_after_consume
  return if revoked?

  if coordinator.success?
    coordinator.pause_tracker.reset

    # We should not move the offset automatically when the partition was paused
    # If we would not do this upon a revocation during the pause time, a different process
    # would pick not from the place where we paused but from the offset that would be
    # automatically committed here
    return if coordinator.manual_pause?

    mark_as_consumed(messages.last)
  else
    retry_after_pause
  end
end

#handle_before_consumeObject

Increment number of attempts



22
23
24
# File 'lib/karafka/processing/strategies/default.rb', line 22

def handle_before_consume
  coordinator.pause_tracker.increment
end

#handle_before_enqueueObject

No actions needed for the standard flow here



17
18
19
# File 'lib/karafka/processing/strategies/default.rb', line 17

def handle_before_enqueue
  nil
end

#handle_consumeObject

Run the user consumption code



27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
# File 'lib/karafka/processing/strategies/default.rb', line 27

def handle_consume
  Karafka.monitor.instrument('consumer.consume', caller: self)
  Karafka.monitor.instrument('consumer.consumed', caller: self) do
    consume
  end

  # Mark job as successful
  coordinator.consumption(self).success!
rescue StandardError => e
  # If failed, mark as failed
  coordinator.consumption(self).failure!(e)

  # Re-raise so reported in the consumer
  raise e
ensure
  # We need to decrease number of jobs that this coordinator coordinates as it has finished
  coordinator.decrement
end

#handle_revokedObject

We need to always un-pause the processing in case we have lost a given partition. Otherwise the underlying librdkafka would not know we may want to continue processing and the pause could in theory last forever



70
71
72
73
74
75
76
77
78
79
# File 'lib/karafka/processing/strategies/default.rb', line 70

def handle_revoked
  resume

  coordinator.revoke

  Karafka.monitor.instrument('consumer.revoke', caller: self)
  Karafka.monitor.instrument('consumer.revoked', caller: self) do
    revoked
  end
end

#handle_shutdownObject

Runs the shutdown code



82
83
84
85
86
87
# File 'lib/karafka/processing/strategies/default.rb', line 82

def handle_shutdown
  Karafka.monitor.instrument('consumer.shutting_down', caller: self)
  Karafka.monitor.instrument('consumer.shutdown', caller: self) do
    shutdown
  end
end