Class: Karafka::Processing::Coordinator
- Inherits:
-
Object
- Object
- Karafka::Processing::Coordinator
- Defined in:
- lib/karafka/processing/coordinator.rb
Overview
This coordinator needs to be thread safe. Some operations are performed only in the listener thread, but we go with thread-safe by default for all not to worry about potential future mistakes.
Basic coordinator that allows us to provide coordination objects into consumers.
This is a wrapping layer to simplify management of work to be handled around consumption.
Direct Known Subclasses
Instance Attribute Summary collapse
-
#partition ⇒ Object
readonly
Returns the value of attribute partition.
-
#pause_tracker ⇒ Object
readonly
Returns the value of attribute pause_tracker.
-
#seek_offset ⇒ Object
Returns the value of attribute seek_offset.
-
#topic ⇒ Object
readonly
Returns the value of attribute topic.
Instance Method Summary collapse
-
#decrement ⇒ Object
Decrements number of jobs we handle at the moment.
-
#failure!(consumer, error) ⇒ Object
Mark given consumption on consumer as failed.
-
#failure? ⇒ Boolean
True if any of work we were running failed.
-
#increment ⇒ Object
Increases number of jobs that we handle with this coordinator.
-
#initialize(topic, partition, pause_tracker) ⇒ Coordinator
constructor
A new instance of Coordinator.
-
#manual_pause ⇒ Object
Store in the coordinator info, that this pause was done manually by the end user and not by the system itself.
-
#manual_pause? ⇒ Boolean
Are we in a pause that was initiated by the user.
-
#manual_seek ⇒ Object
Marks seek as manual for coordination purposes.
-
#manual_seek? ⇒ Boolean
Did a user invoke seek in the current operations scope.
-
#marked? ⇒ Boolean
Was the new seek offset assigned at least once.
-
#revoke ⇒ Object
Marks given coordinator for processing group as revoked.
-
#revoked? ⇒ Boolean
Is the partition we are processing revoked or not.
-
#start(messages) ⇒ Object
Starts the coordinator for given consumption jobs.
-
#success!(consumer) ⇒ Object
Mark given consumption on consumer as successful.
-
#success? ⇒ Boolean
Is all the consumption done and finished successfully for this coordinator We do not say we’re successful until all work is done, because running work may still crash.
-
#synchronize(&block) ⇒ Object
Allows to run synchronized (locked) code that can operate in between virtual partitions.
Constructor Details
#initialize(topic, partition, pause_tracker) ⇒ Coordinator
Returns a new instance of Coordinator.
18 19 20 21 22 23 24 25 26 27 28 29 30 |
# File 'lib/karafka/processing/coordinator.rb', line 18 def initialize(topic, partition, pause_tracker) @topic = topic @partition = partition @pause_tracker = pause_tracker @revoked = false @consumptions = {} @running_jobs = 0 @manual_pause = false @manual_seek = false @mutex = Mutex.new @marked = false @failure = false end |
Instance Attribute Details
#partition ⇒ Object (readonly)
Returns the value of attribute partition.
13 14 15 |
# File 'lib/karafka/processing/coordinator.rb', line 13 def partition @partition end |
#pause_tracker ⇒ Object (readonly)
Returns the value of attribute pause_tracker.
13 14 15 |
# File 'lib/karafka/processing/coordinator.rb', line 13 def pause_tracker @pause_tracker end |
#seek_offset ⇒ Object
Returns the value of attribute seek_offset.
13 14 15 |
# File 'lib/karafka/processing/coordinator.rb', line 13 def seek_offset @seek_offset end |
#topic ⇒ Object (readonly)
Returns the value of attribute topic.
13 14 15 |
# File 'lib/karafka/processing/coordinator.rb', line 13 def topic @topic end |
Instance Method Details
#decrement ⇒ Object
Decrements number of jobs we handle at the moment
74 75 76 77 78 79 80 81 82 83 84 |
# File 'lib/karafka/processing/coordinator.rb', line 74 def decrement synchronize do @running_jobs -= 1 return @running_jobs unless @running_jobs.negative? # This should never happen. If it does, something is heavily out of sync. Please reach # out to us if you encounter this raise Karafka::Errors::InvalidCoordinatorStateError, 'Was zero before decrementation' end end |
#failure!(consumer, error) ⇒ Object
Mark given consumption on consumer as failed
106 107 108 109 110 111 |
# File 'lib/karafka/processing/coordinator.rb', line 106 def failure!(consumer, error) synchronize do @failure = true consumption(consumer).failure!(error) end end |
#failure? ⇒ Boolean
Returns true if any of work we were running failed.
114 115 116 |
# File 'lib/karafka/processing/coordinator.rb', line 114 def failure? @failure end |
#increment ⇒ Object
Increases number of jobs that we handle with this coordinator
69 70 71 |
# File 'lib/karafka/processing/coordinator.rb', line 69 def increment synchronize { @running_jobs += 1 } end |
#manual_pause ⇒ Object
Store in the coordinator info, that this pause was done manually by the end user and not by the system itself
146 147 148 |
# File 'lib/karafka/processing/coordinator.rb', line 146 def manual_pause @manual_pause = true end |
#manual_pause? ⇒ Boolean
Returns are we in a pause that was initiated by the user.
151 152 153 |
# File 'lib/karafka/processing/coordinator.rb', line 151 def manual_pause? @pause_tracker.paused? && @manual_pause end |
#manual_seek ⇒ Object
Marks seek as manual for coordination purposes
156 157 158 |
# File 'lib/karafka/processing/coordinator.rb', line 156 def manual_seek @manual_seek = true end |
#manual_seek? ⇒ Boolean
Returns did a user invoke seek in the current operations scope.
161 162 163 |
# File 'lib/karafka/processing/coordinator.rb', line 161 def manual_seek? @manual_seek end |
#marked? ⇒ Boolean
Returns was the new seek offset assigned at least once. This is needed because by default we assign seek offset of a first message ever, however this is insufficient for DLQ in a scenario where the first message would be broken. We would never move out of it and would end up in an endless loop.
140 141 142 |
# File 'lib/karafka/processing/coordinator.rb', line 140 def marked? @marked end |
#revoke ⇒ Object
Marks given coordinator for processing group as revoked
This is invoked in two places:
- from the main listener loop when we detect revoked partitions
- from the consumer in case checkpointing fails
This means, we can end up having consumer being aware that it was revoked prior to the listener loop dispatching the revocation job. It is ok, as effectively nothing will be processed until revocation jobs are done.
127 128 129 |
# File 'lib/karafka/processing/coordinator.rb', line 127 def revoke synchronize { @revoked = true } end |
#revoked? ⇒ Boolean
Returns is the partition we are processing revoked or not.
132 133 134 |
# File 'lib/karafka/processing/coordinator.rb', line 132 def revoked? @revoked end |
#start(messages) ⇒ Object
Starts the coordinator for given consumption jobs
35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 |
# File 'lib/karafka/processing/coordinator.rb', line 35 def start() @failure = false @running_jobs = 0 # We need to clear the consumption results hash here, otherwise we could end up storing # consumption results of consumer instances we no longer control @consumptions.clear # When starting to run, no pause is expected and no manual pause as well @manual_pause = false # No user invoked seeks on a new run @manual_seek = false # We set it on the first encounter and never again, because then the offset setting # should be up to the consumers logic (our or the end user) # Seek offset needs to be always initialized as for case where manual offset management # is turned on, we need to have reference to the first offset even in case of running # multiple batches without marking any messages as consumed. Rollback needs to happen to # the last place we know of or the last message + 1 that was marked # # It is however worth keeping in mind, that this may need to be used with `#marked?` to # make sure that the first offset is an offset that has been marked. @seek_offset ||= .first.offset end |
#success!(consumer) ⇒ Object
Mark given consumption on consumer as successful
97 98 99 100 101 |
# File 'lib/karafka/processing/coordinator.rb', line 97 def success!(consumer) synchronize do consumption(consumer).success! end end |
#success? ⇒ Boolean
Is all the consumption done and finished successfully for this coordinator We do not say we’re successful until all work is done, because running work may still crash.
89 90 91 92 93 |
# File 'lib/karafka/processing/coordinator.rb', line 89 def success? synchronize do @running_jobs.zero? && @consumptions.values.all?(&:success?) end end |
#synchronize(&block) ⇒ Object
Allows to run synchronized (locked) code that can operate in between virtual partitions
167 168 169 |
# File 'lib/karafka/processing/coordinator.rb', line 167 def synchronize(&block) @mutex.synchronize(&block) end |