Module: Karafka::Pro::Processing::Strategies::Default

Overview

No features enabled. No manual offset management No long running jobs No virtual partitions Nothing. Just standard, automatic flow

Constant Summary collapse

FEATURES =

Apply strategy for a non-feature based flow

%i[].freeze

Instance Method Summary collapse

Methods included from Karafka::Processing::Strategies::Default

#commit_offsets, #commit_offsets!, #handle_eofed, #handle_idle, #handle_initialized, #handle_shutdown, #handle_wrap

Methods included from Karafka::Processing::Strategies::Base

#handle_idle, #handle_shutdown

Instance Method Details

#handle_after_consumeObject

Standard flow without any features



312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
# File 'lib/karafka/pro/processing/strategies/default.rb', line 312

def handle_after_consume
  coordinator.on_finished do |last_group_message|
    return if revoked?

    if coordinator.success?
      coordinator.pause_tracker.reset

      # Do not mark last message if pause happened. This prevents a scenario where pause
      # is overridden upon rebalance by marking
      return if coordinator.manual_pause?

      mark_as_consumed(last_group_message)
    else
      retry_after_pause
    end
  end
end

#handle_before_consumeObject

Increment number of attempts per one “full” job. For all VP on a single topic partition this also should run once.



279
280
281
282
283
# File 'lib/karafka/pro/processing/strategies/default.rb', line 279

def handle_before_consume
  coordinator.on_started do
    coordinator.pause_tracker.increment
  end
end

#handle_before_schedule_consumeObject

No actions needed for the standard flow here



271
272
273
274
275
# File 'lib/karafka/pro/processing/strategies/default.rb', line 271

def handle_before_schedule_consume
  monitor.instrument('consumer.before_schedule_consume', caller: self)

  nil
end

#handle_before_schedule_tickObject

No action needed for the tick standard flow



347
348
349
350
351
# File 'lib/karafka/pro/processing/strategies/default.rb', line 347

def handle_before_schedule_tick
  monitor.instrument('consumer.before_schedule_tick', caller: self)

  nil
end

#handle_consumeObject

Run the user consumption code



286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
# File 'lib/karafka/pro/processing/strategies/default.rb', line 286

def handle_consume
  # We should not run the work at all on a partition that was revoked
  # This can happen primarily when an LRJ job gets to the internal worker queue and
  # this partition is revoked prior processing.
  unless revoked?
    monitor.instrument('consumer.consume', caller: self)
    monitor.instrument('consumer.consumed', caller: self) do
      consume
    end
  end

  # Mark job as successful
  coordinator.success!(self)
rescue StandardError => e
  # If failed, mark as failed
  coordinator.failure!(self, e)

  # Re-raise so reported in the consumer
  raise e
ensure
  # We need to decrease number of jobs that this coordinator coordinates as it has
  # finished
  coordinator.decrement(:consume)
end

#handle_revokedObject

Standard flow for revocation



331
332
333
334
335
336
337
338
339
340
341
342
343
344
# File 'lib/karafka/pro/processing/strategies/default.rb', line 331

def handle_revoked
  coordinator.on_revoked do
    resume

    coordinator.revoke
  end

  monitor.instrument('consumer.revoke', caller: self)
  monitor.instrument('consumer.revoked', caller: self) do
    revoked
  end
ensure
  coordinator.decrement(:revoked)
end

#handle_tickObject

Runs the consumer ‘#tick` method with reporting



354
355
356
357
358
359
360
361
# File 'lib/karafka/pro/processing/strategies/default.rb', line 354

def handle_tick
  monitor.instrument('consumer.tick', caller: self)
  monitor.instrument('consumer.ticked', caller: self) do
    tick
  end
ensure
  coordinator.decrement(:periodic)
end

#mark_as_consumed(message, offset_metadata = @_current_offset_metadata) ⇒ Boolean

Note:

We keep track of this offset in case we would mark as consumed and got error when processing another message. In case like this we do not pause on the message we’ve already processed but rather at the next one. This applies to both sync and async versions of this method.

Marks message as consumed in an async way.

Parameters:

  • message (Messages::Message)

    last successfully processed message.

  • offset_metadata (String, nil) (defaults to: @_current_offset_metadata)

    offset metadata string or nil if nothing

Returns:

  • (Boolean)

    true if we were able to mark the offset, false otherwise. False indicates that we were not able and that we have lost the partition.



56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
# File 'lib/karafka/pro/processing/strategies/default.rb', line 56

def mark_as_consumed(message,  = @_current_offset_metadata)
  # If we are inside a transaction than we can just mark as consumed within it
  if @_in_transaction
    mark_in_transaction(message, , true)
  elsif @_in_transaction_marked
    mark_in_memory(message)
  else
    # seek offset can be nil only in case `#seek` was invoked with offset reset request
    # In case like this we ignore marking
    return true if coordinator.seek_offset.nil?
    # Ignore earlier offsets than the one we already committed
    return true if coordinator.seek_offset > message.offset
    return false if revoked?

    # If we are not inside a transaction but this is a transactional topic, we mark with
    # artificially created transaction
    stored = if producer.transactional?
               mark_with_transaction(message, , true)
             else
               client.mark_as_consumed(message, )
             end

    return revoked? unless stored

    coordinator.seek_offset = message.offset + 1
  end

  true
ensure
  @_current_offset_metadata = nil
end

#mark_as_consumed!(message, offset_metadata = @_current_offset_metadata) ⇒ Boolean

Marks message as consumed in a sync way.

Parameters:

  • message (Messages::Message)

    last successfully processed message.

  • offset_metadata (String, nil) (defaults to: @_current_offset_metadata)

    offset metadata string or nil if nothing

Returns:

  • (Boolean)

    true if we were able to mark the offset, false otherwise. False indicates that we were not able and that we have lost the partition.



94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
# File 'lib/karafka/pro/processing/strategies/default.rb', line 94

def mark_as_consumed!(message,  = @_current_offset_metadata)
  if @_in_transaction
    mark_in_transaction(message, , false)
  elsif @_in_transaction_marked
    mark_in_memory(message)
  else
    # seek offset can be nil only in case `#seek` was invoked with offset reset request
    # In case like this we ignore marking
    return true if coordinator.seek_offset.nil?
    # Ignore earlier offsets than the one we already committed
    return true if coordinator.seek_offset > message.offset
    return false if revoked?

    # If we are not inside a transaction but this is a transactional topic, we mark with
    # artificially created transaction
    stored = if producer.transactional?
               mark_with_transaction(message, , false)
             else
               client.mark_as_consumed!(message, )
             end

    return revoked? unless stored

    coordinator.seek_offset = message.offset + 1
  end

  true
ensure
  @_current_offset_metadata = nil
end

#mark_in_memory(message) ⇒ Boolean

Marks the current state only in memory as the offset marking has already happened using the producer transaction

Parameters:

Returns:

  • (Boolean)

    true if all good, false if we lost assignment and no point in marking



254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
# File 'lib/karafka/pro/processing/strategies/default.rb', line 254

def mark_in_memory(message)
  # seek offset can be nil only in case `#seek` was invoked with offset reset request
  # In case like this we ignore marking
  return true if coordinator.seek_offset.nil?
  # Ignore earlier offsets than the one we already committed
  return true if coordinator.seek_offset > message.offset
  return false if revoked?

  # If we have already marked this successfully in a transaction that was running
  # we should not mark it again with the client offset delegation but instead we should
  # just align the in-memory state
  coordinator.seek_offset = message.offset + 1

  true
end

#mark_in_transaction(message, offset_metadata, async) ⇒ Object

Stores the next offset for processing inside of the transaction and stores it in a local accumulator for post-transaction status update

Parameters:

  • message (Messages::Message)

    message we want to commit inside of a transaction

  • offset_metadata (String, nil)

    offset metadata or nil if none

  • async (Boolean)

    should we mark in async or sync way (applicable only to post transaction state synchronization usage as within transaction it is always sync)

Raises:



209
210
211
212
213
214
215
216
217
218
219
220
221
222
# File 'lib/karafka/pro/processing/strategies/default.rb', line 209

def mark_in_transaction(message, , async)
  raise Errors::TransactionRequiredError unless @_in_transaction
  raise Errors::AssignmentLostError if revoked?

  producer.transaction_mark_as_consumed(
    client,
    message,
    
  )

  @_in_transaction_marked = true
  @_transaction_marked ||= []
  @_transaction_marked << [message, , async]
end

#mark_with_transaction(message, offset_metadata, async) ⇒ Boolean

Returns false if marking failed otherwise true.

Parameters:

  • message (Messages::Message)

    message we want to commit inside of a transaction

  • offset_metadata (String, nil)

    offset metadata or nil if none

  • async (Boolean)

    should we mark in async or sync way (applicable only to post transaction state synchronization usage as within transaction it is always sync)

Returns:

  • (Boolean)

    false if marking failed otherwise true



230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
# File 'lib/karafka/pro/processing/strategies/default.rb', line 230

def mark_with_transaction(message, , async)
  # This flag is used by VPs to differentiate between user initiated transactions and
  # post-execution system transactions.
  @_transaction_internal = true

  transaction do
    mark_in_transaction(message, , async)
  end

  true
# We handle both cases here because this is a private API for internal usage and we want
# the post-user code execution marking with transactional producer to result in a
# boolean state of marking for further framework flow. This is a normalization to make it
# behave the same way as it would behave with a non-transactional one
rescue ::Rdkafka::RdkafkaError, Errors::AssignmentLostError
  false
ensure
  @_transaction_internal = false
end

#store_offset_metadata(offset_metadata) ⇒ Object

Note:

Please be aware, that offset metadata set this way will be passed to any marking as consumed even if it was not user initiated. For example in the DLQ flow.

Allows to set offset metadata that will be used with the upcoming marking as consumed as long as a different offset metadata was not used. After it was used either via ‘#mark_as_consumed` or `#mark_as_consumed!` it will be set back to `nil`. It is done that way to provide the end user with ability to influence metadata on the non-user initiated markings in complex flows.

Parameters:

  • offset_metadata (String, nil)

    metadata we want to store with the upcoming marking as consumed



41
42
43
# File 'lib/karafka/pro/processing/strategies/default.rb', line 41

def ()
  @_current_offset_metadata = 
end

#transaction(active_producer = producer) { ... } ⇒ Object

Note:

Please note, that if you provide the producer, it will reassign the producer of the consumer for the transaction time. This means, that in case you would even accidentally refer to ‘Consumer#producer` from other threads, it will contain the reassigned producer and not the initially used/assigned producer. It is done that way, so the message producing aliases operate from within transactions and since the producer in transaction is locked, it will prevent other threads from using it.

Starts producer transaction, saves the transaction context for transactional marking and runs user code in this context

Transactions on a consumer level differ from those initiated by the producer as they allow to mark offsets inside of the transaction. If the transaction is initialized only from the consumer, the offset will be stored in a regular fashion.

Parameters:

  • active_producer (WaterDrop::Producer) (defaults to: producer)

    alternative producer instance we may want to use. It is useful when we have connection pool or any other selective engine for managing multiple producers. If not provided, default producer taken from ‘#producer` will be used.

Yields:

  • code that we want to run in a transaction



145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
# File 'lib/karafka/pro/processing/strategies/default.rb', line 145

def transaction(active_producer = producer)
  default_producer = producer
  self.producer = active_producer

  transaction_started = false

  # Prevent from nested transactions. It would not make any sense
  raise Errors::TransactionAlreadyInitializedError if @_in_transaction

  transaction_started = true
  @_transaction_marked = []
  @_in_transaction = true
  @_in_transaction_marked = false

  producer.transaction do
    yield

    # Ensure this transaction is rolled back if we have lost the ownership of this
    # transaction. We do it only for transactions that contain offset management as for
    # producer only, this is not relevant.
    raise Errors::AssignmentLostError if @_in_transaction_marked && revoked?
  end

  @_in_transaction = false

  # This offset is already stored in transaction but we set it here anyhow because we
  # want to make sure our internal in-memory state is aligned with the transaction
  #
  # @note We never need to use the blocking `#mark_as_consumed!` here because the offset
  #   anyhow was already stored during the transaction
  #
  # @note Since the offset could have been already stored in Kafka (could have because
  #   you can have transactions without marking), we use the `@_in_transaction_marked`
  #   state to decide if we need to dispatch the offset via client at all
  #   (if post transaction, then we do not have to)
  #
  # @note In theory we could only keep reference to the most recent marking and reject
  #   others. We however do not do it for two reasons:
  #   - User may have non standard flow relying on some alternative order and we want to
  #     mimic this
  #   - Complex strategies like VPs can use this in VPs to mark in parallel without
  #     having to redefine the transactional flow completely
  @_transaction_marked.each do |marking|
    marking.pop ? mark_as_consumed(*marking) : mark_as_consumed!(*marking)
  end

  true
ensure
  self.producer = default_producer

  if transaction_started
    @_transaction_marked.clear
    @_in_transaction = false
    @_in_transaction_marked = false
  end
end