Module: Karafka::Pro::Processing::Strategies::Default
- Includes:
- Base, Karafka::Processing::Strategies::Default
- Included in:
- Aj::DlqLrjMom, Aj::DlqMomVp, Aj::LrjMomVp, Aj::MomVp, Karafka::Pro::Processing::Strategies::Dlq::Default, Ftr::Default, Lrj::Default, Lrj::Mom, Mom::Default, Vp::Default
- Defined in:
- lib/karafka/pro/processing/strategies/default.rb
Overview
No features enabled. No manual offset management No long running jobs No virtual partitions Nothing. Just standard, automatic flow
Constant Summary collapse
- FEATURES =
Apply strategy for a non-feature based flow
%i[].freeze
Instance Method Summary collapse
-
#handle_after_consume ⇒ Object
Standard flow without any features.
-
#handle_before_consume ⇒ Object
Increment number of attempts per one “full” job.
-
#handle_before_schedule_consume ⇒ Object
No actions needed for the standard flow here.
-
#handle_before_schedule_tick ⇒ Object
No action needed for the tick standard flow.
-
#handle_consume ⇒ Object
Run the user consumption code.
-
#handle_revoked ⇒ Object
Standard flow for revocation.
-
#handle_tick ⇒ Object
Runs the consumer ‘#tick` method with reporting.
-
#mark_as_consumed(message, offset_metadata = @_current_offset_metadata) ⇒ Boolean
Marks message as consumed in an async way.
-
#mark_as_consumed!(message, offset_metadata = @_current_offset_metadata) ⇒ Boolean
Marks message as consumed in a sync way.
-
#mark_in_memory(message) ⇒ Boolean
Marks the current state only in memory as the offset marking has already happened using the producer transaction.
-
#mark_in_transaction(message, offset_metadata, async) ⇒ Object
Stores the next offset for processing inside of the transaction and stores it in a local accumulator for post-transaction status update.
-
#mark_with_transaction(message, offset_metadata, async) ⇒ Boolean
False if marking failed otherwise true.
-
#store_offset_metadata(offset_metadata) ⇒ Object
Allows to set offset metadata that will be used with the upcoming marking as consumed as long as a different offset metadata was not used.
-
#transaction(active_producer = producer) { ... } ⇒ Object
Starts producer transaction, saves the transaction context for transactional marking and runs user code in this context.
Methods included from Karafka::Processing::Strategies::Default
#commit_offsets, #commit_offsets!, #handle_eofed, #handle_idle, #handle_initialized, #handle_shutdown, #handle_wrap
Methods included from Karafka::Processing::Strategies::Base
#handle_idle, #handle_shutdown
Instance Method Details
#handle_after_consume ⇒ Object
Standard flow without any features
312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 |
# File 'lib/karafka/pro/processing/strategies/default.rb', line 312 def handle_after_consume coordinator.on_finished do || return if revoked? if coordinator.success? coordinator.pause_tracker.reset # Do not mark last message if pause happened. This prevents a scenario where pause # is overridden upon rebalance by marking return if coordinator.manual_pause? mark_as_consumed() else retry_after_pause end end end |
#handle_before_consume ⇒ Object
Increment number of attempts per one “full” job. For all VP on a single topic partition this also should run once.
279 280 281 282 283 |
# File 'lib/karafka/pro/processing/strategies/default.rb', line 279 def handle_before_consume coordinator.on_started do coordinator.pause_tracker.increment end end |
#handle_before_schedule_consume ⇒ Object
No actions needed for the standard flow here
271 272 273 274 275 |
# File 'lib/karafka/pro/processing/strategies/default.rb', line 271 def handle_before_schedule_consume monitor.instrument('consumer.before_schedule_consume', caller: self) nil end |
#handle_before_schedule_tick ⇒ Object
No action needed for the tick standard flow
347 348 349 350 351 |
# File 'lib/karafka/pro/processing/strategies/default.rb', line 347 def handle_before_schedule_tick monitor.instrument('consumer.before_schedule_tick', caller: self) nil end |
#handle_consume ⇒ Object
Run the user consumption code
286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 |
# File 'lib/karafka/pro/processing/strategies/default.rb', line 286 def handle_consume # We should not run the work at all on a partition that was revoked # This can happen primarily when an LRJ job gets to the internal worker queue and # this partition is revoked prior processing. unless revoked? monitor.instrument('consumer.consume', caller: self) monitor.instrument('consumer.consumed', caller: self) do consume end end # Mark job as successful coordinator.success!(self) rescue StandardError => e # If failed, mark as failed coordinator.failure!(self, e) # Re-raise so reported in the consumer raise e ensure # We need to decrease number of jobs that this coordinator coordinates as it has # finished coordinator.decrement(:consume) end |
#handle_revoked ⇒ Object
Standard flow for revocation
331 332 333 334 335 336 337 338 339 340 341 342 343 344 |
# File 'lib/karafka/pro/processing/strategies/default.rb', line 331 def handle_revoked coordinator.on_revoked do resume coordinator.revoke end monitor.instrument('consumer.revoke', caller: self) monitor.instrument('consumer.revoked', caller: self) do revoked end ensure coordinator.decrement(:revoked) end |
#handle_tick ⇒ Object
Runs the consumer ‘#tick` method with reporting
354 355 356 357 358 359 360 361 |
# File 'lib/karafka/pro/processing/strategies/default.rb', line 354 def handle_tick monitor.instrument('consumer.tick', caller: self) monitor.instrument('consumer.ticked', caller: self) do tick end ensure coordinator.decrement(:periodic) end |
#mark_as_consumed(message, offset_metadata = @_current_offset_metadata) ⇒ Boolean
We keep track of this offset in case we would mark as consumed and got error when processing another message. In case like this we do not pause on the message we’ve already processed but rather at the next one. This applies to both sync and async versions of this method.
Marks message as consumed in an async way.
56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 |
# File 'lib/karafka/pro/processing/strategies/default.rb', line 56 def mark_as_consumed(, = @_current_offset_metadata) # If we are inside a transaction than we can just mark as consumed within it if @_in_transaction mark_in_transaction(, , true) elsif @_in_transaction_marked mark_in_memory() else # seek offset can be nil only in case `#seek` was invoked with offset reset request # In case like this we ignore marking return true if coordinator.seek_offset.nil? # Ignore earlier offsets than the one we already committed return true if coordinator.seek_offset > .offset return false if revoked? # If we are not inside a transaction but this is a transactional topic, we mark with # artificially created transaction stored = if producer.transactional? mark_with_transaction(, , true) else client.mark_as_consumed(, ) end return revoked? unless stored coordinator.seek_offset = .offset + 1 end true ensure @_current_offset_metadata = nil end |
#mark_as_consumed!(message, offset_metadata = @_current_offset_metadata) ⇒ Boolean
Marks message as consumed in a sync way.
94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 |
# File 'lib/karafka/pro/processing/strategies/default.rb', line 94 def mark_as_consumed!(, = @_current_offset_metadata) if @_in_transaction mark_in_transaction(, , false) elsif @_in_transaction_marked mark_in_memory() else # seek offset can be nil only in case `#seek` was invoked with offset reset request # In case like this we ignore marking return true if coordinator.seek_offset.nil? # Ignore earlier offsets than the one we already committed return true if coordinator.seek_offset > .offset return false if revoked? # If we are not inside a transaction but this is a transactional topic, we mark with # artificially created transaction stored = if producer.transactional? mark_with_transaction(, , false) else client.mark_as_consumed!(, ) end return revoked? unless stored coordinator.seek_offset = .offset + 1 end true ensure @_current_offset_metadata = nil end |
#mark_in_memory(message) ⇒ Boolean
Marks the current state only in memory as the offset marking has already happened using the producer transaction
254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 |
# File 'lib/karafka/pro/processing/strategies/default.rb', line 254 def mark_in_memory() # seek offset can be nil only in case `#seek` was invoked with offset reset request # In case like this we ignore marking return true if coordinator.seek_offset.nil? # Ignore earlier offsets than the one we already committed return true if coordinator.seek_offset > .offset return false if revoked? # If we have already marked this successfully in a transaction that was running # we should not mark it again with the client offset delegation but instead we should # just align the in-memory state coordinator.seek_offset = .offset + 1 true end |
#mark_in_transaction(message, offset_metadata, async) ⇒ Object
Stores the next offset for processing inside of the transaction and stores it in a local accumulator for post-transaction status update
209 210 211 212 213 214 215 216 217 218 219 220 221 222 |
# File 'lib/karafka/pro/processing/strategies/default.rb', line 209 def mark_in_transaction(, , async) raise Errors::TransactionRequiredError unless @_in_transaction raise Errors::AssignmentLostError if revoked? producer.transaction_mark_as_consumed( client, , ) @_in_transaction_marked = true @_transaction_marked ||= [] @_transaction_marked << [, , async] end |
#mark_with_transaction(message, offset_metadata, async) ⇒ Boolean
Returns false if marking failed otherwise true.
230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 |
# File 'lib/karafka/pro/processing/strategies/default.rb', line 230 def mark_with_transaction(, , async) # This flag is used by VPs to differentiate between user initiated transactions and # post-execution system transactions. @_transaction_internal = true transaction do mark_in_transaction(, , async) end true # We handle both cases here because this is a private API for internal usage and we want # the post-user code execution marking with transactional producer to result in a # boolean state of marking for further framework flow. This is a normalization to make it # behave the same way as it would behave with a non-transactional one rescue ::Rdkafka::RdkafkaError, Errors::AssignmentLostError false ensure @_transaction_internal = false end |
#store_offset_metadata(offset_metadata) ⇒ Object
Please be aware, that offset metadata set this way will be passed to any marking as consumed even if it was not user initiated. For example in the DLQ flow.
Allows to set offset metadata that will be used with the upcoming marking as consumed as long as a different offset metadata was not used. After it was used either via ‘#mark_as_consumed` or `#mark_as_consumed!` it will be set back to `nil`. It is done that way to provide the end user with ability to influence metadata on the non-user initiated markings in complex flows.
41 42 43 |
# File 'lib/karafka/pro/processing/strategies/default.rb', line 41 def () @_current_offset_metadata = end |
#transaction(active_producer = producer) { ... } ⇒ Object
Please note, that if you provide the producer, it will reassign the producer of the consumer for the transaction time. This means, that in case you would even accidentally refer to ‘Consumer#producer` from other threads, it will contain the reassigned producer and not the initially used/assigned producer. It is done that way, so the message producing aliases operate from within transactions and since the producer in transaction is locked, it will prevent other threads from using it.
Starts producer transaction, saves the transaction context for transactional marking and runs user code in this context
Transactions on a consumer level differ from those initiated by the producer as they allow to mark offsets inside of the transaction. If the transaction is initialized only from the consumer, the offset will be stored in a regular fashion.
145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 |
# File 'lib/karafka/pro/processing/strategies/default.rb', line 145 def transaction(active_producer = producer) default_producer = producer self.producer = active_producer transaction_started = false # Prevent from nested transactions. It would not make any sense raise Errors::TransactionAlreadyInitializedError if @_in_transaction transaction_started = true @_transaction_marked = [] @_in_transaction = true @_in_transaction_marked = false producer.transaction do yield # Ensure this transaction is rolled back if we have lost the ownership of this # transaction. We do it only for transactions that contain offset management as for # producer only, this is not relevant. raise Errors::AssignmentLostError if @_in_transaction_marked && revoked? end @_in_transaction = false # This offset is already stored in transaction but we set it here anyhow because we # want to make sure our internal in-memory state is aligned with the transaction # # @note We never need to use the blocking `#mark_as_consumed!` here because the offset # anyhow was already stored during the transaction # # @note Since the offset could have been already stored in Kafka (could have because # you can have transactions without marking), we use the `@_in_transaction_marked` # state to decide if we need to dispatch the offset via client at all # (if post transaction, then we do not have to) # # @note In theory we could only keep reference to the most recent marking and reject # others. We however do not do it for two reasons: # - User may have non standard flow relying on some alternative order and we want to # mimic this # - Complex strategies like VPs can use this in VPs to mark in parallel without # having to redefine the transactional flow completely @_transaction_marked.each do |marking| marking.pop ? mark_as_consumed(*marking) : mark_as_consumed!(*marking) end true ensure self.producer = default_producer if transaction_started @_transaction_marked.clear @_in_transaction = false @_in_transaction_marked = false end end |