Class: Karafka::Processing::Coordinator
- Inherits:
-
Object
- Object
- Karafka::Processing::Coordinator
- Extended by:
- Forwardable
- Includes:
- Core::Helpers::Time
- Defined in:
- lib/karafka/processing/coordinator.rb
Overview
This coordinator needs to be thread safe. Some operations are performed only in the listener thread, but we go with thread-safe by default for all not to worry about potential future mistakes.
Basic coordinator that allows us to provide coordination objects into consumers.
This is a wrapping layer to simplify management of work to be handled around consumption.
Direct Known Subclasses
Instance Attribute Summary collapse
-
#eofed ⇒ Object
This can be set directly on the listener because it can be triggered on first run without any messages.
-
#last_polled_at ⇒ Object
Last polled at time set based on the incoming last poll time.
-
#partition ⇒ Object
readonly
Returns the value of attribute partition.
-
#pause_tracker ⇒ Object
readonly
Returns the value of attribute pause_tracker.
-
#seek_offset ⇒ Object
Returns the value of attribute seek_offset.
-
#topic ⇒ Object
readonly
Returns the value of attribute topic.
Instance Method Summary collapse
-
#consumption(consumer) ⇒ Karafka::Processing::Result
Result object which we can use to indicate consumption processing state.
-
#decrement(job_type) ⇒ Object
Decrements number of jobs we handle at the moment.
-
#eofed? ⇒ Boolean
Did we reach end of partition when polling data.
-
#failure!(consumer, error) ⇒ Object
Mark given consumption on consumer as failed.
-
#failure? ⇒ Boolean
True if any of work we were running failed.
-
#increment(job_type) ⇒ Object
Increases number of jobs that we handle with this coordinator.
-
#initialize(topic, partition, pause_tracker) ⇒ Coordinator
constructor
A new instance of Coordinator.
-
#manual_pause ⇒ Object
Store in the coordinator info, that this pause was done manually by the end user and not by the system itself.
-
#manual_pause? ⇒ Boolean
Are we in a pause that was initiated by the user.
-
#manual_seek ⇒ Object
Marks seek as manual for coordination purposes.
-
#manual_seek? ⇒ Boolean
Did a user invoke seek in the current operations scope.
-
#marked? ⇒ Boolean
Was the new seek offset assigned at least once.
-
#revoke ⇒ Object
Marks given coordinator for processing group as revoked.
-
#revoked? ⇒ Boolean
Is the partition we are processing revoked or not.
-
#start(messages) ⇒ Object
Starts the coordinator for given consumption jobs.
-
#success!(consumer) ⇒ Object
Mark given consumption on consumer as successful.
-
#success? ⇒ Boolean
Is all the consumption done and finished successfully for this coordinator We do not say we’re successful until all work is done, because running work may still crash.
-
#synchronize(&block) ⇒ Object
Allows to run synchronized (locked) code that can operate only from a given thread.
Constructor Details
#initialize(topic, partition, pause_tracker) ⇒ Coordinator
Returns a new instance of Coordinator.
30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 |
# File 'lib/karafka/processing/coordinator.rb', line 30 def initialize(topic, partition, pause_tracker) @topic = topic @partition = partition @pause_tracker = pause_tracker @revoked = false @consumptions = {} @running_jobs = Hash.new { |h, k| h[k] = 0 } @manual_pause = false @manual_seek = false @mutex = Mutex.new @marked = false @failure = false @eofed = false @changed_at = monotonic_now @last_polled_at = @changed_at end |
Instance Attribute Details
#eofed ⇒ Object
This can be set directly on the listener because it can be triggered on first run without any messages
20 21 22 |
# File 'lib/karafka/processing/coordinator.rb', line 20 def eofed @eofed end |
#last_polled_at ⇒ Object
Last polled at time set based on the incoming last poll time
23 24 25 |
# File 'lib/karafka/processing/coordinator.rb', line 23 def last_polled_at @last_polled_at end |
#partition ⇒ Object (readonly)
Returns the value of attribute partition.
16 17 18 |
# File 'lib/karafka/processing/coordinator.rb', line 16 def partition @partition end |
#pause_tracker ⇒ Object (readonly)
Returns the value of attribute pause_tracker.
16 17 18 |
# File 'lib/karafka/processing/coordinator.rb', line 16 def pause_tracker @pause_tracker end |
#seek_offset ⇒ Object
Returns the value of attribute seek_offset.
16 17 18 |
# File 'lib/karafka/processing/coordinator.rb', line 16 def seek_offset @seek_offset end |
#topic ⇒ Object (readonly)
Returns the value of attribute topic.
16 17 18 |
# File 'lib/karafka/processing/coordinator.rb', line 16 def topic @topic end |
Instance Method Details
#consumption(consumer) ⇒ Karafka::Processing::Result
Returns result object which we can use to indicate consumption processing state.
195 196 197 |
# File 'lib/karafka/processing/coordinator.rb', line 195 def consumption(consumer) @consumptions[consumer] ||= Processing::Result.new end |
#decrement(job_type) ⇒ Object
Decrements number of jobs we handle at the moment
94 95 96 97 98 99 100 101 102 103 104 105 |
# File 'lib/karafka/processing/coordinator.rb', line 94 def decrement(job_type) synchronize do @running_jobs[job_type] -= 1 @changed_at = monotonic_now return @running_jobs[job_type] unless @running_jobs[job_type].negative? # This should never happen. If it does, something is heavily out of sync. Please reach # out to us if you encounter this raise Karafka::Errors::InvalidCoordinatorStateError, 'Was zero before decrementation' end end |
#eofed? ⇒ Boolean
Returns did we reach end of partition when polling data.
159 160 161 |
# File 'lib/karafka/processing/coordinator.rb', line 159 def eofed? @eofed end |
#failure!(consumer, error) ⇒ Object
Mark given consumption on consumer as failed
128 129 130 131 132 133 |
# File 'lib/karafka/processing/coordinator.rb', line 128 def failure!(consumer, error) synchronize do @failure = true consumption(consumer).failure!(error) end end |
#failure? ⇒ Boolean
Returns true if any of work we were running failed.
136 137 138 |
# File 'lib/karafka/processing/coordinator.rb', line 136 def failure? @failure end |
#increment(job_type) ⇒ Object
Increases number of jobs that we handle with this coordinator
85 86 87 88 89 90 |
# File 'lib/karafka/processing/coordinator.rb', line 85 def increment(job_type) synchronize do @running_jobs[job_type] += 1 @changed_at = monotonic_now end end |
#manual_pause ⇒ Object
Store in the coordinator info, that this pause was done manually by the end user and not by the system itself
173 174 175 |
# File 'lib/karafka/processing/coordinator.rb', line 173 def manual_pause @manual_pause = true end |
#manual_pause? ⇒ Boolean
Returns are we in a pause that was initiated by the user.
178 179 180 |
# File 'lib/karafka/processing/coordinator.rb', line 178 def manual_pause? paused? && @manual_pause end |
#manual_seek ⇒ Object
Marks seek as manual for coordination purposes
183 184 185 |
# File 'lib/karafka/processing/coordinator.rb', line 183 def manual_seek @manual_seek = true end |
#manual_seek? ⇒ Boolean
Returns did a user invoke seek in the current operations scope.
188 189 190 |
# File 'lib/karafka/processing/coordinator.rb', line 188 def manual_seek? @manual_seek end |
#marked? ⇒ Boolean
Returns was the new seek offset assigned at least once. This is needed because by default we assign seek offset of a first message ever, however this is insufficient for DLQ in a scenario where the first message would be broken. We would never move out of it and would end up in an endless loop.
167 168 169 |
# File 'lib/karafka/processing/coordinator.rb', line 167 def marked? @marked end |
#revoke ⇒ Object
Marks given coordinator for processing group as revoked
This is invoked in two places:
- from the main listener loop when we detect revoked partitions
- from the consumer in case checkpointing fails
This means, we can end up having consumer being aware that it was revoked prior to the listener loop dispatching the revocation job. It is ok, as effectively nothing will be processed until revocation jobs are done.
149 150 151 |
# File 'lib/karafka/processing/coordinator.rb', line 149 def revoke synchronize { @revoked = true } end |
#revoked? ⇒ Boolean
Returns is the partition we are processing revoked or not.
154 155 156 |
# File 'lib/karafka/processing/coordinator.rb', line 154 def revoked? @revoked end |
#start(messages) ⇒ Object
Starts the coordinator for given consumption jobs
50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 |
# File 'lib/karafka/processing/coordinator.rb', line 50 def start() @failure = false @running_jobs[:consume] = 0 # We need to clear the consumption results hash here, otherwise we could end up storing # consumption results of consumer instances we no longer control @consumptions.clear # When starting to run, no pause is expected and no manual pause as well @manual_pause = false # No user invoked seeks on a new run @manual_seek = false # We set it on the first encounter and never again, because then the offset setting # should be up to the consumers logic (our or the end user) # Seek offset needs to be always initialized as for case where manual offset management # is turned on, we need to have reference to the first offset even in case of running # multiple batches without marking any messages as consumed. Rollback needs to happen to # the last place we know of or the last message + 1 that was marked # # It is however worth keeping in mind, that this may need to be used with `#marked?` to # make sure that the first offset is an offset that has been marked. @seek_offset ||= .first.offset end |
#success!(consumer) ⇒ Object
Mark given consumption on consumer as successful
119 120 121 122 123 |
# File 'lib/karafka/processing/coordinator.rb', line 119 def success!(consumer) synchronize do consumption(consumer).success! end end |
#success? ⇒ Boolean
This is only used for consume synchronization
Is all the consumption done and finished successfully for this coordinator We do not say we’re successful until all work is done, because running work may still crash.
111 112 113 114 115 |
# File 'lib/karafka/processing/coordinator.rb', line 111 def success? synchronize do @running_jobs[:consume].zero? && @consumptions.values.all?(&:success?) end end |
#synchronize(&block) ⇒ Object
We check if mutex is not owned already by the current thread so we won’t end up with a deadlock in case user runs coordinated code from inside of his own lock
This is internal and should not be used to synchronize user-facing code. Otherwise user indirectly could cause deadlocks or prolonged locks by running his logic. This can and should however be used for multi-thread strategy applications and other internal operations locks.
Allows to run synchronized (locked) code that can operate only from a given thread
210 211 212 213 214 215 216 |
# File 'lib/karafka/processing/coordinator.rb', line 210 def synchronize(&block) if @mutex.owned? yield else @mutex.synchronize(&block) end end |