Class: Karafka::Processing::JobsQueue
- Inherits:
-
Object
- Object
- Karafka::Processing::JobsQueue
- Defined in:
- lib/karafka/processing/jobs_queue.rb
Overview
This is the key work component for Karafka jobs distribution. It provides API for running jobs in parallel while operating within more than one subscription group.
We need to take into consideration fact, that more than one subscription group can operate on this queue, that’s why internally we keep track of processing per group.
We work with the assumption, that partitions data is evenly distributed.
Instance Method Summary collapse
-
#<<(job) ⇒ Object
Adds the job to the internal main queue, scheduling it for execution in a worker and marks this job as in processing pipeline.
-
#clear(group_id) ⇒ Object
Clears the processing states for a provided group.
-
#close ⇒ Object
Stops the whole processing queue.
-
#complete(job) ⇒ Object
Marks a given job from a given group as completed.
-
#empty?(group_id) ⇒ Boolean
a given group.
- #initialize ⇒ Karafka::Processing::JobsQueue constructor
-
#pop ⇒ Jobs::Base?
Waits for a job from the main queue and returns it once available or returns nil if the queue has been stopped and there won’t be anything more to process ever.
-
#size ⇒ Integer
Returns number of jobs that are either enqueued or in processing (but not finished).
-
#statistics ⇒ Hash
-
‘busy` - number of jobs that are currently being processed (active work) - `enqueued` - number of jobs in the queue that are waiting to be picked up by a worker.
-
-
#tick(group_id) ⇒ Object
Causes the wait lock to re-check the lock conditions and potential unlock.
-
#wait(group_id) ⇒ Object
Blocks when there are things in the queue in a given group and waits until all the blocking jobs from a given group are completed.
Constructor Details
#initialize ⇒ Karafka::Processing::JobsQueue
14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 |
# File 'lib/karafka/processing/jobs_queue.rb', line 14 def initialize @queue = Queue.new # Those queues will act as semaphores internally. Since we need an indicator for waiting # we could use Thread.pass but this is expensive. Instead we can just lock until any # of the workers finishes their work and we can re-check. This means that in the worse # scenario, we will context switch 10 times per poll instead of getting this thread # scheduled by Ruby hundreds of thousands of times per group. # We cannot use a single semaphore as it could potentially block in listeners that should # process with their data and also could unlock when a given group needs to remain locked @semaphores = Concurrent::Map.new do |h, k| h.compute_if_absent(k) { Queue.new } end @in_processing = Hash.new { |h, k| h[k] = [] } @mutex = Mutex.new end |
Instance Method Details
#<<(job) ⇒ Object
Adds the job to the internal main queue, scheduling it for execution in a worker and marks this job as in processing pipeline.
43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 |
# File 'lib/karafka/processing/jobs_queue.rb', line 43 def <<(job) # We do not push the job if the queue is closed as it means that it would anyhow not be # executed return if @queue.closed? @mutex.synchronize do group = @in_processing[job.group_id] raise(Errors::JobsQueueSynchronizationError, job.group_id) if group.include?(job) group << job @queue << job end end |
#clear(group_id) ⇒ Object
Clears the processing states for a provided group. Useful when a recovery happens and we need to clean up state but only for a given subscription group.
89 90 91 92 93 94 95 |
# File 'lib/karafka/processing/jobs_queue.rb', line 89 def clear(group_id) @mutex.synchronize do @in_processing[group_id].clear # We unlock it just in case it was blocked when clearing started tick(group_id) end end |
#close ⇒ Object
Stops the whole processing queue.
98 99 100 101 102 103 104 105 |
# File 'lib/karafka/processing/jobs_queue.rb', line 98 def close @mutex.synchronize do return if @queue.closed? @queue.close @semaphores.each_value(&:close) end end |
#complete(job) ⇒ Object
Marks a given job from a given group as completed. When there are no more jobs from a given group to be executed, we won’t wait.
78 79 80 81 82 83 |
# File 'lib/karafka/processing/jobs_queue.rb', line 78 def complete(job) @mutex.synchronize do @in_processing[job.group_id].delete(job) tick(job.group_id) end end |
#empty?(group_id) ⇒ Boolean
a given group.
111 112 113 114 115 |
# File 'lib/karafka/processing/jobs_queue.rb', line 111 def empty?(group_id) @mutex.synchronize do @in_processing[group_id].empty? end end |
#pop ⇒ Jobs::Base?
This command is blocking and will wait until any job is available on the main queue
Returns waits for a job from the main queue and returns it once available or returns nil if the queue has been stopped and there won’t be anything more to process ever.
63 64 65 |
# File 'lib/karafka/processing/jobs_queue.rb', line 63 def pop @queue.pop end |
#size ⇒ Integer
Using ‘#pop` won’t decrease this number as only marking job as completed does this
Returns number of jobs that are either enqueued or in processing (but not finished)
35 36 37 |
# File 'lib/karafka/processing/jobs_queue.rb', line 35 def size @in_processing.values.map(&:size).sum end |
#statistics ⇒ Hash
-
‘busy` - number of jobs that are currently being processed (active work)
-
‘enqueued` - number of jobs in the queue that are waiting to be picked up by a worker
132 133 134 135 136 137 |
# File 'lib/karafka/processing/jobs_queue.rb', line 132 def statistics { busy: size - @queue.size, enqueued: @queue.size }.freeze end |
#tick(group_id) ⇒ Object
This does not release the wait lock. It just causes a conditions recheck
Causes the wait lock to re-check the lock conditions and potential unlock.
70 71 72 |
# File 'lib/karafka/processing/jobs_queue.rb', line 70 def tick(group_id) @semaphores[group_id] << true end |
#wait(group_id) ⇒ Object
This method is blocking.
Blocks when there are things in the queue in a given group and waits until all the blocking
jobs from a given group are completed
122 123 124 125 126 |
# File 'lib/karafka/processing/jobs_queue.rb', line 122 def wait(group_id) # Go doing other things while we cannot process and wait for anyone to finish their work # and re-check the wait status @semaphores[group_id].pop while wait?(group_id) end |