Class: Karafka::Processing::Coordinator
- Inherits:
-
Object
- Object
- Karafka::Processing::Coordinator
- Defined in:
- lib/karafka/processing/coordinator.rb
Overview
This coordinator needs to be thread safe. Some operations are performed only in the listener thread, but we go with thread-safe by default for all not to worry about potential future mistakes.
Basic coordinator that allows us to provide coordination objects into consumers.
This is a wrapping layer to simplify management of work to be handled around consumption.
Direct Known Subclasses
Instance Attribute Summary collapse
- #pause_tracker ⇒ Karafka::TimeTrackers::Pause readonly
-
#seek_offset ⇒ Object
Returns the value of attribute seek_offset.
Instance Method Summary collapse
-
#consumption(consumer) ⇒ Karafka::Processing::Result
Result object which we can use to indicate consumption processing state.
-
#decrement ⇒ Object
Decrements number of jobs we handle at the moment.
-
#increment ⇒ Object
Increases number of jobs that we handle with this coordinator.
-
#initialize(pause_tracker) ⇒ Coordinator
constructor
A new instance of Coordinator.
-
#manual_pause ⇒ Object
Store in the coordinator info, that this pause was done manually by the end user and not by the system itself.
-
#manual_pause? ⇒ Boolean
Are we in a pause that was initiated by the user.
-
#revoke ⇒ Object
Marks given coordinator for processing group as revoked.
-
#revoked? ⇒ Boolean
Is the partition we are processing revoked or not.
-
#start(messages) ⇒ Object
Starts the coordinator for given consumption jobs.
-
#success? ⇒ Boolean
Is all the consumption done and finished successfully for this coordinator.
Constructor Details
#initialize(pause_tracker) ⇒ Coordinator
Returns a new instance of Coordinator.
19 20 21 22 23 24 25 26 |
# File 'lib/karafka/processing/coordinator.rb', line 19 def initialize(pause_tracker) @pause_tracker = pause_tracker @revoked = false @consumptions = {} @running_jobs = 0 @manual_pause = false @mutex = Mutex.new end |
Instance Attribute Details
#pause_tracker ⇒ Karafka::TimeTrackers::Pause (readonly)
14 15 16 |
# File 'lib/karafka/processing/coordinator.rb', line 14 def pause_tracker @pause_tracker end |
#seek_offset ⇒ Object
Returns the value of attribute seek_offset.
16 17 18 |
# File 'lib/karafka/processing/coordinator.rb', line 16 def seek_offset @seek_offset end |
Instance Method Details
#consumption(consumer) ⇒ Karafka::Processing::Result
Returns result object which we can use to indicate consumption processing state.
77 78 79 80 81 |
# File 'lib/karafka/processing/coordinator.rb', line 77 def consumption(consumer) @mutex.synchronize do @consumptions[consumer] ||= Processing::Result.new end end |
#decrement ⇒ Object
Decrements number of jobs we handle at the moment
62 63 64 65 66 67 68 69 70 71 72 |
# File 'lib/karafka/processing/coordinator.rb', line 62 def decrement @mutex.synchronize do @running_jobs -= 1 return @running_jobs unless @running_jobs.negative? # This should never happen. If it does, something is heavily out of sync. Please reach # out to us if you encounter this raise Karafka::Errors::InvalidCoordinatorStateError, 'Was zero before decrementation' end end |
#increment ⇒ Object
Increases number of jobs that we handle with this coordinator
57 58 59 |
# File 'lib/karafka/processing/coordinator.rb', line 57 def increment @mutex.synchronize { @running_jobs += 1 } end |
#manual_pause ⇒ Object
Store in the coordinator info, that this pause was done manually by the end user and not by the system itself
108 109 110 |
# File 'lib/karafka/processing/coordinator.rb', line 108 def manual_pause @mutex.synchronize { @manual_pause = true } end |
#manual_pause? ⇒ Boolean
Returns are we in a pause that was initiated by the user.
113 114 115 |
# File 'lib/karafka/processing/coordinator.rb', line 113 def manual_pause? @pause_tracker.paused? && @manual_pause end |
#revoke ⇒ Object
Marks given coordinator for processing group as revoked
This is invoked in two places:
- from the main listener loop when we detect revoked partitions
- from the consumer in case checkpointing fails
This means, we can end up having consumer being aware that it was revoked prior to the listener loop dispatching the revocation job. It is ok, as effectively nothing will be processed until revocation jobs are done.
97 98 99 |
# File 'lib/karafka/processing/coordinator.rb', line 97 def revoke @mutex.synchronize { @revoked = true } end |
#revoked? ⇒ Boolean
Returns is the partition we are processing revoked or not.
102 103 104 |
# File 'lib/karafka/processing/coordinator.rb', line 102 def revoked? @revoked end |
#start(messages) ⇒ Object
Starts the coordinator for given consumption jobs
31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 |
# File 'lib/karafka/processing/coordinator.rb', line 31 def start() @mutex.synchronize do @running_jobs = 0 # We need to clear the consumption results hash here, otherwise we could end up storing # consumption results of consumer instances we no longer control @consumptions.clear # When starting to run, no pause is expected and no manual pause as well @manual_pause = false # We set it on the first encounter and never again, because then the offset setting # should be up to the consumers logic (our or the end user) # Seek offset needs to be always initialized as for case where manual offset management # is turned on, we need to have reference to the first offset even in case of running # multiple batches without marking any messages as consumed. Rollback needs to happen to # the last place we know of or the last message + 1 that was marked @seek_offset ||= .first.offset end end |
#success? ⇒ Boolean
Is all the consumption done and finished successfully for this coordinator
84 85 86 |
# File 'lib/karafka/processing/coordinator.rb', line 84 def success? @mutex.synchronize { @running_jobs.zero? && @consumptions.values.all?(&:success?) } end |