Module: Karafka::Processing::Strategies::Default

Includes:
Base
Included in:
Karafka::Pro::Processing::Strategies::Default, Dlq, Mom
Defined in:
lib/karafka/processing/strategies/default.rb

Overview

No features enabled. No manual offset management No long running jobs Nothing. Just standard, automatic flow

Constant Summary collapse

FEATURES =

Apply strategy for a non-feature based flow

%i[].freeze

Instance Method Summary collapse

Instance Method Details

#commit_offsets(async: true) ⇒ Boolean

Note:

Due to its async nature, this may not fully represent the offset state in some edge cases (like for example going beyond max.poll.interval)

Triggers an async offset commit

Parameters:

  • async (Boolean) (defaults to: true)

    should we use async (default) or sync commit

Returns:

  • (Boolean)

    true if we still own the partition.



94
95
96
97
98
99
100
101
102
# File 'lib/karafka/processing/strategies/default.rb', line 94

def commit_offsets(async: true)
  # Do not commit if we already lost the assignment
  return false if revoked?
  return true if client.commit_offsets(async: async)

  # This will once more check the librdkafka revocation status and will revoke the
  # coordinator in case it was not revoked
  revoked?
end

#commit_offsets!Boolean

Note:

This is fully synchronous, hence the result of this can be used in DB transactions etc as a way of making sure, that we still own the partition.

Triggers a synchronous offsets commit to Kafka

Returns:

  • (Boolean)

    true if we still own the partition, false otherwise.



109
110
111
# File 'lib/karafka/processing/strategies/default.rb', line 109

def commit_offsets!
  commit_offsets(async: false)
end

#handle_after_consumeObject

Standard flow marks work as consumed and moves on if everything went ok. If there was a processing error, we will pause and continue from the next message (next that is +1 from the last one that was successfully marked as consumed)



151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
# File 'lib/karafka/processing/strategies/default.rb', line 151

def handle_after_consume
  return if revoked?

  if coordinator.success?
    coordinator.pause_tracker.reset

    # We should not move the offset automatically when the partition was paused
    # If we would not do this upon a revocation during the pause time, a different process
    # would pick not from the place where we paused but from the offset that would be
    # automatically committed here
    return if coordinator.manual_pause?

    mark_as_consumed(messages.last)
  else
    retry_after_pause
  end
end

#handle_before_consumeObject

Increment number of attempts



114
115
116
# File 'lib/karafka/processing/strategies/default.rb', line 114

def handle_before_consume
  coordinator.pause_tracker.increment
end

#handle_consumeObject

Run the user consumption code



130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
# File 'lib/karafka/processing/strategies/default.rb', line 130

def handle_consume
  monitor.instrument('consumer.consume', caller: self)
  monitor.instrument('consumer.consumed', caller: self) do
    consume
  end

  # Mark job as successful
  coordinator.success!(self)
rescue StandardError => e
  coordinator.failure!(self, e)

  # Re-raise so reported in the consumer
  raise e
ensure
  # We need to decrease number of jobs that this coordinator coordinates as it has finished
  coordinator.decrement(:consume)
end

#handle_eofedObject

Runs the consumer ‘#eofed` method with reporting



177
178
179
180
181
182
183
184
# File 'lib/karafka/processing/strategies/default.rb', line 177

def handle_eofed
  monitor.instrument('consumer.eof', caller: self)
  monitor.instrument('consumer.eofed', caller: self) do
    eofed
  end
ensure
  coordinator.decrement(:eofed)
end

#handle_idleObject

Code that should run on idle runs without messages available



170
171
172
173
174
# File 'lib/karafka/processing/strategies/default.rb', line 170

def handle_idle
  nil
ensure
  coordinator.decrement(:idle)
end

#handle_initializedObject

Note:

It runs in the listener loop. Should not be used for anything heavy or with any potential errors. Mostly for initialization of states, etc.

Runs the post-creation, post-assignment code



37
38
39
40
41
42
# File 'lib/karafka/processing/strategies/default.rb', line 37

def handle_initialized
  monitor.instrument('consumer.initialize', caller: self)
  monitor.instrument('consumer.initialized', caller: self) do
    initialized
  end
end

#handle_revokedObject

We need to always un-pause the processing in case we have lost a given partition. Otherwise the underlying librdkafka would not know we may want to continue processing and the pause could in theory last forever



189
190
191
192
193
194
195
196
197
198
199
200
# File 'lib/karafka/processing/strategies/default.rb', line 189

def handle_revoked
  resume

  coordinator.revoke

  monitor.instrument('consumer.revoke', caller: self)
  monitor.instrument('consumer.revoked', caller: self) do
    revoked
  end
ensure
  coordinator.decrement(:revoked)
end

#handle_shutdownObject

Runs the shutdown code



203
204
205
206
207
208
209
210
# File 'lib/karafka/processing/strategies/default.rb', line 203

def handle_shutdown
  monitor.instrument('consumer.shutting_down', caller: self)
  monitor.instrument('consumer.shutdown', caller: self) do
    shutdown
  end
ensure
  coordinator.decrement(:shutdown)
end

#handle_wrap(action, &block) ⇒ Object

Runs the wrapping to execute appropriate action wrapped with the wrapper method code

Parameters:

  • action (Symbol)
  • block (Proc)


122
123
124
125
126
127
# File 'lib/karafka/processing/strategies/default.rb', line 122

def handle_wrap(action, &block)
  monitor.instrument('consumer.wrap', caller: self)
  monitor.instrument('consumer.wrapped', caller: self) do
    wrap(action, &block)
  end
end

#mark_as_consumed(message) ⇒ Boolean

Note:

We keep track of this offset in case we would mark as consumed and got error when processing another message. In case like this we do not pause on the message we’ve already processed but rather at the next one. This applies to both sync and async versions of this method.

Marks message as consumed in an async way.

Parameters:

Returns:

  • (Boolean)

    true if we were able to mark the offset, false otherwise. False indicates that we were not able and that we have lost the partition.



54
55
56
57
58
59
60
61
62
63
64
65
66
# File 'lib/karafka/processing/strategies/default.rb', line 54

def mark_as_consumed(message)
  # seek offset can be nil only in case `#seek` was invoked with offset reset request
  # In case like this we ignore marking
  return true if coordinator.seek_offset.nil?
  # Ignore earlier offsets than the one we already committed
  return true if coordinator.seek_offset > message.offset
  return false if revoked?
  return revoked? unless client.mark_as_consumed(message)

  coordinator.seek_offset = message.offset + 1

  true
end

#mark_as_consumed!(message) ⇒ Boolean

Marks message as consumed in a sync way.

Parameters:

Returns:

  • (Boolean)

    true if we were able to mark the offset, false otherwise. False indicates that we were not able and that we have lost the partition.



73
74
75
76
77
78
79
80
81
82
83
84
85
86
# File 'lib/karafka/processing/strategies/default.rb', line 73

def mark_as_consumed!(message)
  # seek offset can be nil only in case `#seek` was invoked with offset reset request
  # In case like this we ignore marking
  return true if coordinator.seek_offset.nil?
  # Ignore earlier offsets than the one we already committed
  return true if coordinator.seek_offset > message.offset
  return false if revoked?

  return revoked? unless client.mark_as_consumed!(message)

  coordinator.seek_offset = message.offset + 1

  true
end