Module: Karafka::Processing::Strategies::Default
- Includes:
- Base
- Included in:
- Karafka::Pro::Processing::Strategies::Default, Dlq, Mom
- Defined in:
- lib/karafka/processing/strategies/default.rb
Overview
No features enabled. No manual offset management No long running jobs Nothing. Just standard, automatic flow
Constant Summary collapse
- FEATURES =
Apply strategy for a non-feature based flow
%i[].freeze
Instance Method Summary collapse
-
#commit_offsets(async: true) ⇒ Boolean
Triggers an async offset commit.
-
#commit_offsets! ⇒ Boolean
Triggers a synchronous offsets commit to Kafka.
-
#handle_after_consume ⇒ Object
Standard flow marks work as consumed and moves on if everything went ok.
-
#handle_before_consume ⇒ Object
Increment number of attempts.
-
#handle_before_enqueue ⇒ Object
No actions needed for the standard flow here.
-
#handle_consume ⇒ Object
Run the user consumption code.
-
#handle_idle ⇒ Object
Code that should run on idle runs without messages available.
-
#handle_revoked ⇒ Object
We need to always un-pause the processing in case we have lost a given partition.
-
#handle_shutdown ⇒ Object
Runs the shutdown code.
-
#mark_as_consumed(message) ⇒ Boolean
Marks message as consumed in an async way.
-
#mark_as_consumed!(message) ⇒ Boolean
Marks message as consumed in a sync way.
Instance Method Details
#commit_offsets(async: true) ⇒ Boolean
Due to its async nature, this may not fully represent the offset state in some edge cases (like for example going beyond max.poll.interval)
Triggers an async offset commit
60 61 62 63 64 65 66 67 68 |
# File 'lib/karafka/processing/strategies/default.rb', line 60 def commit_offsets(async: true) # Do not commit if we already lost the assignment return false if revoked? return true if client.commit_offsets(async: async) # This will once more check the librdkafka revocation status and will revoke the # coordinator in case it was not revoked revoked? end |
#commit_offsets! ⇒ Boolean
This is fully synchronous, hence the result of this can be used in DB transactions etc as a way of making sure, that we still own the partition.
Triggers a synchronous offsets commit to Kafka
75 76 77 |
# File 'lib/karafka/processing/strategies/default.rb', line 75 def commit_offsets! commit_offsets(async: false) end |
#handle_after_consume ⇒ Object
Standard flow marks work as consumed and moves on if everything went ok. If there was a processing error, we will pause and continue from the next message (next that is +1 from the last one that was successfully marked as consumed)
111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 |
# File 'lib/karafka/processing/strategies/default.rb', line 111 def handle_after_consume return if revoked? if coordinator.success? coordinator.pause_tracker.reset # We should not move the offset automatically when the partition was paused # If we would not do this upon a revocation during the pause time, a different process # would pick not from the place where we paused but from the offset that would be # automatically committed here return if coordinator.manual_pause? mark_as_consumed(.last) else retry_after_pause end end |
#handle_before_consume ⇒ Object
Increment number of attempts
85 86 87 |
# File 'lib/karafka/processing/strategies/default.rb', line 85 def handle_before_consume coordinator.pause_tracker.increment end |
#handle_before_enqueue ⇒ Object
No actions needed for the standard flow here
80 81 82 |
# File 'lib/karafka/processing/strategies/default.rb', line 80 def handle_before_enqueue nil end |
#handle_consume ⇒ Object
Run the user consumption code
90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 |
# File 'lib/karafka/processing/strategies/default.rb', line 90 def handle_consume Karafka.monitor.instrument('consumer.consume', caller: self) Karafka.monitor.instrument('consumer.consumed', caller: self) do consume end # Mark job as successful coordinator.success!(self) rescue StandardError => e coordinator.failure!(self, e) # Re-raise so reported in the consumer raise e ensure # We need to decrease number of jobs that this coordinator coordinates as it has finished coordinator.decrement end |
#handle_idle ⇒ Object
Code that should run on idle runs without messages available
130 131 132 |
# File 'lib/karafka/processing/strategies/default.rb', line 130 def handle_idle nil end |
#handle_revoked ⇒ Object
We need to always un-pause the processing in case we have lost a given partition. Otherwise the underlying librdkafka would not know we may want to continue processing and the pause could in theory last forever
137 138 139 140 141 142 143 144 145 146 |
# File 'lib/karafka/processing/strategies/default.rb', line 137 def handle_revoked resume coordinator.revoke Karafka.monitor.instrument('consumer.revoke', caller: self) Karafka.monitor.instrument('consumer.revoked', caller: self) do revoked end end |
#handle_shutdown ⇒ Object
Runs the shutdown code
149 150 151 152 153 154 |
# File 'lib/karafka/processing/strategies/default.rb', line 149 def handle_shutdown Karafka.monitor.instrument('consumer.shutting_down', caller: self) Karafka.monitor.instrument('consumer.shutdown', caller: self) do shutdown end end |
#mark_as_consumed(message) ⇒ Boolean
We keep track of this offset in case we would mark as consumed and got error when processing another message. In case like this we do not pause on the message we’ve already processed but rather at the next one. This applies to both sync and async versions of this method.
Marks message as consumed in an async way.
26 27 28 29 30 31 32 33 34 35 |
# File 'lib/karafka/processing/strategies/default.rb', line 26 def mark_as_consumed() # Ignore earlier offsets than the one we already committed return true if coordinator.seek_offset > .offset return false if revoked? return revoked? unless client.mark_as_consumed() coordinator.seek_offset = .offset + 1 true end |
#mark_as_consumed!(message) ⇒ Boolean
Marks message as consumed in a sync way.
42 43 44 45 46 47 48 49 50 51 52 |
# File 'lib/karafka/processing/strategies/default.rb', line 42 def mark_as_consumed!() # Ignore earlier offsets than the one we already committed return true if coordinator.seek_offset > .offset return false if revoked? return revoked? unless client.mark_as_consumed!() coordinator.seek_offset = .offset + 1 true end |