Module: Karafka::Pro::Processing::Strategies::Default
- Includes:
- Base, Karafka::Processing::Strategies::Default
- Included in:
- Aj::DlqLrjMom, Aj::DlqMomVp, Aj::LrjMomVp, Aj::MomVp, Karafka::Pro::Processing::Strategies::Dlq::Default, Ftr::Default, Lrj::Default, Lrj::Mom, Mom::Default, Vp::Default
- Defined in:
- lib/karafka/pro/processing/strategies/default.rb
Overview
No features enabled. No manual offset management No long running jobs No virtual partitions Nothing. Just standard, automatic flow
Constant Summary collapse
- FEATURES =
Apply strategy for a non-feature based flow
%i[].freeze
Instance Method Summary collapse
-
#handle_after_consume ⇒ Object
Standard flow without any features.
-
#handle_before_consume ⇒ Object
Increment number of attempts per one “full” job.
-
#handle_before_schedule_consume ⇒ Object
No actions needed for the standard flow here.
-
#handle_before_schedule_tick ⇒ Object
No action needed for the tick standard flow.
-
#handle_consume ⇒ Object
Run the user consumption code.
-
#handle_revoked ⇒ Object
Standard flow for revocation.
-
#handle_tick ⇒ Object
Runs the consumer ‘#tick` method with reporting.
-
#mark_as_consumed(message, offset_metadata = @_current_offset_metadata) ⇒ Boolean
Marks message as consumed in an async way.
-
#mark_as_consumed!(message, offset_metadata = @_current_offset_metadata) ⇒ Boolean
Marks message as consumed in a sync way.
-
#mark_in_transaction(message, offset_metadata, async) ⇒ Object
Stores the next offset for processing inside of the transaction and stores it in a local accumulator for post-transaction status update.
-
#store_offset_metadata(offset_metadata) ⇒ Object
Allows to set offset metadata that will be used with the upcoming marking as consumed as long as a different offset metadata was not used.
-
#transaction(active_producer = producer, &block) ⇒ Object
Starts producer transaction, saves the transaction context for transactional marking and runs user code in this context.
Methods included from Karafka::Processing::Strategies::Default
#commit_offsets, #commit_offsets!, #handle_eofed, #handle_idle, #handle_initialized, #handle_shutdown
Methods included from Karafka::Processing::Strategies::Base
#handle_idle, #handle_shutdown
Instance Method Details
#handle_after_consume ⇒ Object
Standard flow without any features
227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 |
# File 'lib/karafka/pro/processing/strategies/default.rb', line 227 def handle_after_consume coordinator.on_finished do || return if revoked? if coordinator.success? coordinator.pause_tracker.reset # Do not mark last message if pause happened. This prevents a scenario where pause # is overridden upon rebalance by marking return if coordinator.manual_pause? mark_as_consumed() else retry_after_pause end end end |
#handle_before_consume ⇒ Object
Increment number of attempts per one “full” job. For all VP on a single topic partition this also should run once.
194 195 196 197 198 |
# File 'lib/karafka/pro/processing/strategies/default.rb', line 194 def handle_before_consume coordinator.on_started do coordinator.pause_tracker.increment end end |
#handle_before_schedule_consume ⇒ Object
No actions needed for the standard flow here
186 187 188 189 190 |
# File 'lib/karafka/pro/processing/strategies/default.rb', line 186 def handle_before_schedule_consume Karafka.monitor.instrument('consumer.before_schedule_consume', caller: self) nil end |
#handle_before_schedule_tick ⇒ Object
No action needed for the tick standard flow
262 263 264 265 266 |
# File 'lib/karafka/pro/processing/strategies/default.rb', line 262 def handle_before_schedule_tick Karafka.monitor.instrument('consumer.before_schedule_tick', caller: self) nil end |
#handle_consume ⇒ Object
Run the user consumption code
201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 |
# File 'lib/karafka/pro/processing/strategies/default.rb', line 201 def handle_consume # We should not run the work at all on a partition that was revoked # This can happen primarily when an LRJ job gets to the internal worker queue and # this partition is revoked prior processing. unless revoked? Karafka.monitor.instrument('consumer.consume', caller: self) Karafka.monitor.instrument('consumer.consumed', caller: self) do consume end end # Mark job as successful coordinator.success!(self) rescue StandardError => e # If failed, mark as failed coordinator.failure!(self, e) # Re-raise so reported in the consumer raise e ensure # We need to decrease number of jobs that this coordinator coordinates as it has # finished coordinator.decrement(:consume) end |
#handle_revoked ⇒ Object
Standard flow for revocation
246 247 248 249 250 251 252 253 254 255 256 257 258 259 |
# File 'lib/karafka/pro/processing/strategies/default.rb', line 246 def handle_revoked coordinator.on_revoked do resume coordinator.revoke end Karafka.monitor.instrument('consumer.revoke', caller: self) Karafka.monitor.instrument('consumer.revoked', caller: self) do revoked end ensure coordinator.decrement(:revoked) end |
#handle_tick ⇒ Object
Runs the consumer ‘#tick` method with reporting
269 270 271 272 273 274 275 276 |
# File 'lib/karafka/pro/processing/strategies/default.rb', line 269 def handle_tick Karafka.monitor.instrument('consumer.tick', caller: self) Karafka.monitor.instrument('consumer.ticked', caller: self) do tick end ensure coordinator.decrement(:periodic) end |
#mark_as_consumed(message, offset_metadata = @_current_offset_metadata) ⇒ Boolean
We keep track of this offset in case we would mark as consumed and got error when processing another message. In case like this we do not pause on the message we’ve already processed but rather at the next one. This applies to both sync and async versions of this method.
Marks message as consumed in an async way.
56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 |
# File 'lib/karafka/pro/processing/strategies/default.rb', line 56 def mark_as_consumed(, = @_current_offset_metadata) if @_in_transaction mark_in_transaction(, , true) else # seek offset can be nil only in case `#seek` was invoked with offset reset request # In case like this we ignore marking return true if coordinator.seek_offset.nil? # Ignore earlier offsets than the one we already committed return true if coordinator.seek_offset > .offset return false if revoked? return revoked? unless client.mark_as_consumed(, ) coordinator.seek_offset = .offset + 1 end true ensure @_current_offset_metadata = nil end |
#mark_as_consumed!(message, offset_metadata = @_current_offset_metadata) ⇒ Boolean
Marks message as consumed in a sync way.
82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 |
# File 'lib/karafka/pro/processing/strategies/default.rb', line 82 def mark_as_consumed!(, = @_current_offset_metadata) if @_in_transaction mark_in_transaction(, , false) else # seek offset can be nil only in case `#seek` was invoked with offset reset request # In case like this we ignore marking return true if coordinator.seek_offset.nil? # Ignore earlier offsets than the one we already committed return true if coordinator.seek_offset > .offset return false if revoked? return revoked? unless client.mark_as_consumed!(, ) coordinator.seek_offset = .offset + 1 end true ensure @_current_offset_metadata = nil end |
#mark_in_transaction(message, offset_metadata, async) ⇒ Object
Stores the next offset for processing inside of the transaction and stores it in a local accumulator for post-transaction status update
171 172 173 174 175 176 177 178 179 180 181 182 183 |
# File 'lib/karafka/pro/processing/strategies/default.rb', line 171 def mark_in_transaction(, , async) raise Errors::TransactionRequiredError unless @_in_transaction raise Errors::AssignmentLostError if revoked? producer.transaction_mark_as_consumed( client, , ) @_transaction_marked ||= [] @_transaction_marked << [, , async] end |
#store_offset_metadata(offset_metadata) ⇒ Object
Please be aware, that offset metadata set this way will be passed to any marking as consumed even if it was not user initiated. For example in the DLQ flow.
Allows to set offset metadata that will be used with the upcoming marking as consumed as long as a different offset metadata was not used. After it was used either via ‘#mark_as_consumed` or `#mark_as_consumed!` it will be set back to `nil`. It is done that way to provide the end user with ability to influence metadata on the non-user initiated markings in complex flows.
41 42 43 |
# File 'lib/karafka/pro/processing/strategies/default.rb', line 41 def () @_current_offset_metadata = end |
#transaction(active_producer = producer, &block) ⇒ Object
Please note, that if you provide the producer, it will reassign the producer of the consumer for the transaction time. This means, that in case you would even accidentally refer to ‘Consumer#producer` from other threads, it will contain the reassigned producer and not the initially used/assigned producer. It is done that way, so the message producing aliases operate from within transactions and since the producer in transaction is locked, it will prevent other threads from using it.
Starts producer transaction, saves the transaction context for transactional marking and runs user code in this context
Transactions on a consumer level differ from those initiated by the producer as they allow to mark offsets inside of the transaction. If the transaction is initialized only from the consumer, the offset will be stored in a regular fashion.
123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 |
# File 'lib/karafka/pro/processing/strategies/default.rb', line 123 def transaction(active_producer = producer, &block) default_producer = producer self.producer = active_producer transaction_started = false # Prevent from nested transactions. It would not make any sense raise Errors::TransactionAlreadyInitializedError if @_in_transaction transaction_started = true @_transaction_marked = [] @_in_transaction = true producer.transaction(&block) @_in_transaction = false # This offset is already stored in transaction but we set it here anyhow because we # want to make sure our internal in-memory state is aligned with the transaction # # @note We never need to use the blocking `#mark_as_consumed!` here because the offset # anyhow was already stored during the transaction # # @note In theory we could only keep reference to the most recent marking and reject # others. We however do not do it for two reasons: # - User may have non standard flow relying on some alternative order and we want to # mimic this # - Complex strategies like VPs can use this in VPs to mark in parallel without # having to redefine the transactional flow completely @_transaction_marked.each do |marking| marking.pop ? mark_as_consumed(*marking) : mark_as_consumed!(*marking) end ensure self.producer = default_producer if transaction_started @_transaction_marked.clear @_in_transaction = false end end |