Module: Karafka::Processing::Strategies::Default
- Includes:
- Base
- Included in:
- Karafka::Pro::Processing::Strategies::Default, Dlq, Mom
- Defined in:
- lib/karafka/processing/strategies/default.rb
Overview
No features enabled. No manual offset management No long running jobs Nothing. Just standard, automatic flow
Constant Summary collapse
- FEATURES =
Apply strategy for a non-feature based flow
%i[].freeze
Instance Method Summary collapse
-
#commit_offsets(async: true) ⇒ Boolean
Triggers an async offset commit.
-
#commit_offsets! ⇒ Boolean
Triggers a synchronous offsets commit to Kafka.
-
#handle_after_consume ⇒ Object
Standard flow marks work as consumed and moves on if everything went ok.
-
#handle_before_consume ⇒ Object
Increment number of attempts.
-
#handle_consume ⇒ Object
Run the user consumption code.
-
#handle_eofed ⇒ Object
Runs the consumer ‘#eofed` method with reporting.
-
#handle_idle ⇒ Object
Code that should run on idle runs without messages available.
-
#handle_initialized ⇒ Object
Runs the post-creation, post-assignment code.
-
#handle_revoked ⇒ Object
We need to always un-pause the processing in case we have lost a given partition.
-
#handle_shutdown ⇒ Object
Runs the shutdown code.
-
#handle_wrap(action, &block) ⇒ Object
Runs the wrapping to execute appropriate action wrapped with the wrapper method code.
-
#mark_as_consumed(message) ⇒ Boolean
Marks message as consumed in an async way.
-
#mark_as_consumed!(message) ⇒ Boolean
Marks message as consumed in a sync way.
Instance Method Details
#commit_offsets(async: true) ⇒ Boolean
Due to its async nature, this may not fully represent the offset state in some edge cases (like for example going beyond max.poll.interval)
Triggers an async offset commit
94 95 96 97 98 99 100 101 102 |
# File 'lib/karafka/processing/strategies/default.rb', line 94 def commit_offsets(async: true) # Do not commit if we already lost the assignment return false if revoked? return true if client.commit_offsets(async: async) # This will once more check the librdkafka revocation status and will revoke the # coordinator in case it was not revoked revoked? end |
#commit_offsets! ⇒ Boolean
This is fully synchronous, hence the result of this can be used in DB transactions etc as a way of making sure, that we still own the partition.
Triggers a synchronous offsets commit to Kafka
109 110 111 |
# File 'lib/karafka/processing/strategies/default.rb', line 109 def commit_offsets! commit_offsets(async: false) end |
#handle_after_consume ⇒ Object
Standard flow marks work as consumed and moves on if everything went ok. If there was a processing error, we will pause and continue from the next message (next that is +1 from the last one that was successfully marked as consumed)
151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 |
# File 'lib/karafka/processing/strategies/default.rb', line 151 def handle_after_consume return if revoked? if coordinator.success? coordinator.pause_tracker.reset # We should not move the offset automatically when the partition was paused # If we would not do this upon a revocation during the pause time, a different process # would pick not from the place where we paused but from the offset that would be # automatically committed here return if coordinator.manual_pause? mark_as_consumed(.last) else retry_after_pause end end |
#handle_before_consume ⇒ Object
Increment number of attempts
114 115 116 |
# File 'lib/karafka/processing/strategies/default.rb', line 114 def handle_before_consume coordinator.pause_tracker.increment end |
#handle_consume ⇒ Object
Run the user consumption code
130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 |
# File 'lib/karafka/processing/strategies/default.rb', line 130 def handle_consume monitor.instrument('consumer.consume', caller: self) monitor.instrument('consumer.consumed', caller: self) do consume end # Mark job as successful coordinator.success!(self) rescue StandardError => e coordinator.failure!(self, e) # Re-raise so reported in the consumer raise e ensure # We need to decrease number of jobs that this coordinator coordinates as it has finished coordinator.decrement(:consume) end |
#handle_eofed ⇒ Object
Runs the consumer ‘#eofed` method with reporting
177 178 179 180 181 182 183 184 |
# File 'lib/karafka/processing/strategies/default.rb', line 177 def handle_eofed monitor.instrument('consumer.eof', caller: self) monitor.instrument('consumer.eofed', caller: self) do eofed end ensure coordinator.decrement(:eofed) end |
#handle_idle ⇒ Object
Code that should run on idle runs without messages available
170 171 172 173 174 |
# File 'lib/karafka/processing/strategies/default.rb', line 170 def handle_idle nil ensure coordinator.decrement(:idle) end |
#handle_initialized ⇒ Object
It runs in the listener loop. Should not be used for anything heavy or with any potential errors. Mostly for initialization of states, etc.
Runs the post-creation, post-assignment code
37 38 39 40 41 42 |
# File 'lib/karafka/processing/strategies/default.rb', line 37 def handle_initialized monitor.instrument('consumer.initialize', caller: self) monitor.instrument('consumer.initialized', caller: self) do initialized end end |
#handle_revoked ⇒ Object
We need to always un-pause the processing in case we have lost a given partition. Otherwise the underlying librdkafka would not know we may want to continue processing and the pause could in theory last forever
189 190 191 192 193 194 195 196 197 198 199 200 |
# File 'lib/karafka/processing/strategies/default.rb', line 189 def handle_revoked resume coordinator.revoke monitor.instrument('consumer.revoke', caller: self) monitor.instrument('consumer.revoked', caller: self) do revoked end ensure coordinator.decrement(:revoked) end |
#handle_shutdown ⇒ Object
Runs the shutdown code
203 204 205 206 207 208 209 210 |
# File 'lib/karafka/processing/strategies/default.rb', line 203 def handle_shutdown monitor.instrument('consumer.shutting_down', caller: self) monitor.instrument('consumer.shutdown', caller: self) do shutdown end ensure coordinator.decrement(:shutdown) end |
#handle_wrap(action, &block) ⇒ Object
Runs the wrapping to execute appropriate action wrapped with the wrapper method code
122 123 124 125 126 127 |
# File 'lib/karafka/processing/strategies/default.rb', line 122 def handle_wrap(action, &block) monitor.instrument('consumer.wrap', caller: self) monitor.instrument('consumer.wrapped', caller: self) do wrap(action, &block) end end |
#mark_as_consumed(message) ⇒ Boolean
We keep track of this offset in case we would mark as consumed and got error when processing another message. In case like this we do not pause on the message we’ve already processed but rather at the next one. This applies to both sync and async versions of this method.
Marks message as consumed in an async way.
54 55 56 57 58 59 60 61 62 63 64 65 66 |
# File 'lib/karafka/processing/strategies/default.rb', line 54 def mark_as_consumed() # seek offset can be nil only in case `#seek` was invoked with offset reset request # In case like this we ignore marking return true if coordinator.seek_offset.nil? # Ignore earlier offsets than the one we already committed return true if coordinator.seek_offset > .offset return false if revoked? return revoked? unless client.mark_as_consumed() coordinator.seek_offset = .offset + 1 true end |
#mark_as_consumed!(message) ⇒ Boolean
Marks message as consumed in a sync way.
73 74 75 76 77 78 79 80 81 82 83 84 85 86 |
# File 'lib/karafka/processing/strategies/default.rb', line 73 def mark_as_consumed!() # seek offset can be nil only in case `#seek` was invoked with offset reset request # In case like this we ignore marking return true if coordinator.seek_offset.nil? # Ignore earlier offsets than the one we already committed return true if coordinator.seek_offset > .offset return false if revoked? return revoked? unless client.mark_as_consumed!() coordinator.seek_offset = .offset + 1 true end |