Class: Karafka::Pro::Processing::JobsQueue
- Inherits:
-
Karafka::Processing::JobsQueue
- Object
- Karafka::Processing::JobsQueue
- Karafka::Pro::Processing::JobsQueue
- Includes:
- Core::Helpers::Time
- Defined in:
- lib/karafka/pro/processing/jobs_queue.rb
Overview
Enhanced processing queue that provides ability to build complex work-distribution schedulers dedicated to particular job types
Aside from the OSS queue capabilities it allows for jobless locking for advanced schedulers
Instance Attribute Summary collapse
-
#in_processing ⇒ Object
Returns the value of attribute in_processing.
Instance Method Summary collapse
-
#clear(group_id) ⇒ Object
Clears the processing states for a provided group.
-
#empty?(group_id) ⇒ Boolean
a given group.
- #initialize ⇒ Karafka::Pro::Processing::JobsQueue constructor
-
#lock(job) ⇒ Object
Method that allows us to lock queue on a given subscription group without enqueuing the a job.
-
#lock_async(group_id, lock_id, timeout: WAIT_TIMEOUT) ⇒ Object
Allows for explicit locking of the queue of a given subscription group.
-
#register(group_id) ⇒ Object
Registers semaphore and a lock hash.
-
#unlock(job) ⇒ Object
Method for unlocking the given subscription group queue space that was locked with a given job that was not added to the queue but used via ‘#lock`.
-
#unlock_async(group_id, lock_id) ⇒ Object
Allows for explicit unlocking of locked queue of a group.
-
#wait(group_id) ⇒ Object
Blocks when there are things in the queue in a given group and waits until all the blocking jobs from a given group are completed or any of the locks times out.
Methods inherited from Karafka::Processing::JobsQueue
#<<, #close, #complete, #pop, #statistics, #tick
Constructor Details
#initialize ⇒ Karafka::Pro::Processing::JobsQueue
32 33 34 35 36 37 38 39 40 |
# File 'lib/karafka/pro/processing/jobs_queue.rb', line 32 def initialize super @in_waiting = Hash.new { |h, k| h[k] = [] } @locks = Hash.new { |h, k| h[k] = {} } @async_locking = false @statistics[:waiting] = 0 end |
Instance Attribute Details
#in_processing ⇒ Object
Returns the value of attribute in_processing.
24 25 26 |
# File 'lib/karafka/pro/processing/jobs_queue.rb', line 24 def in_processing @in_processing end |
Instance Method Details
#clear(group_id) ⇒ Object
Clears the processing states for a provided group. Useful when a recovery happens and we need to clean up state but only for a given subscription group.
132 133 134 135 136 137 138 139 140 141 142 143 144 |
# File 'lib/karafka/pro/processing/jobs_queue.rb', line 132 def clear(group_id) @mutex.synchronize do @in_processing[group_id].clear @statistics[:waiting] -= @in_waiting[group_id].size @in_waiting[group_id].clear @locks[group_id].clear @async_locking = false # We unlock it just in case it was blocked when clearing started tick(group_id) end end |
#empty?(group_id) ⇒ Boolean
a given group.
150 151 152 153 154 155 156 |
# File 'lib/karafka/pro/processing/jobs_queue.rb', line 150 def empty?(group_id) @mutex.synchronize do @in_processing[group_id].empty? && @in_waiting[group_id].empty? && !locked_async?(group_id) end end |
#lock(job) ⇒ Object
Method that allows us to lock queue on a given subscription group without enqueuing the a job. This can be used when building complex schedulers that want to postpone enqueuing before certain conditions are met.
57 58 59 60 61 62 63 64 65 66 67 68 |
# File 'lib/karafka/pro/processing/jobs_queue.rb', line 57 def lock(job) @mutex.synchronize do group = @in_waiting[job.group_id] # This should never happen. Same job should not be locked twice raise(Errors::JobsQueueSynchronizationError, job.group_id) if group.include?(job) @statistics[:waiting] += 1 group << job end end |
#lock_async(group_id, lock_id, timeout: WAIT_TIMEOUT) ⇒ Object
We do not raise ‘Errors::JobsQueueSynchronizationError` similar to `#lock` here because we want to have ability to prolong time limited locks
Allows for explicit locking of the queue of a given subscription group.
This can be used for cross-topic synchronization.
98 99 100 101 102 103 104 105 106 107 108 109 |
# File 'lib/karafka/pro/processing/jobs_queue.rb', line 98 def lock_async(group_id, lock_id, timeout: WAIT_TIMEOUT) return if @queue.closed? @async_locking = true @mutex.synchronize do @locks[group_id][lock_id] = monotonic_now + timeout # We need to tick so our new time sensitive lock can reload time constraints on sleep tick(group_id) end end |
#register(group_id) ⇒ Object
Registers semaphore and a lock hash
45 46 47 48 49 50 |
# File 'lib/karafka/pro/processing/jobs_queue.rb', line 45 def register(group_id) super @mutex.synchronize do @locks[group_id] end end |
#unlock(job) ⇒ Object
Method for unlocking the given subscription group queue space that was locked with a given job that was not added to the queue but used via ‘#lock`.
74 75 76 77 78 79 80 81 82 83 84 |
# File 'lib/karafka/pro/processing/jobs_queue.rb', line 74 def unlock(job) @mutex.synchronize do @statistics[:waiting] -= 1 return if @in_waiting[job.group_id].delete(job) # This should never happen. It means there was a job being unlocked that was never # locked in the first place raise(Errors::JobsQueueSynchronizationError, job.group_id) end end |
#unlock_async(group_id, lock_id) ⇒ Object
Allows for explicit unlocking of locked queue of a group
116 117 118 119 120 121 122 123 124 125 126 |
# File 'lib/karafka/pro/processing/jobs_queue.rb', line 116 def unlock_async(group_id, lock_id) @mutex.synchronize do if @locks[group_id].delete(lock_id) tick(group_id) return end raise(Errors::JobsQueueSynchronizationError, [group_id, lock_id]) end end |
#wait(group_id) ⇒ Object
Because checking that async locking is on happens on regular ticking, first lock on a group can take up to one tick. That is expected.
This implementation takes into consideration temporary async locks that can happen. Thanks to the fact that we use the minimum lock time as a timeout, we do not have to wait a whole ticking period to unlock async locks.
Blocks when there are things in the queue in a given group and waits until all the
blocking jobs from a given group are completed or any of the locks times out
169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 |
# File 'lib/karafka/pro/processing/jobs_queue.rb', line 169 def wait(group_id) return super unless @async_locking # We do not generalize this flow because this one is more expensive as it has to allocate # extra objects. That's why we only use it when locks are actually in use base_interval = tick_interval / 1_000.0 while wait?(group_id) yield if block_given? now = monotonic_now wait_times = @locks[group_id].values.map! do |lock_time| # Convert ms to seconds, seconds are required by Ruby queue engine (lock_time - now) / 1_000 end wait_times.delete_if(&:negative?) wait_times << base_interval @semaphores.fetch(group_id).pop(timeout: wait_times.min) end end |