Class: ImprovedQueue::ImprovedSizedQueue
- Inherits:
-
Object
- Object
- ImprovedQueue::ImprovedSizedQueue
- Defined in:
- lib/improved-queue.rb
Overview
In most respects this class can be used identically to SizedQueue. It implements all of the methods which are usually available, but bears the important distinction that any thread blocking on a dequeue or enqueue operation can be “woken up” by calling the unblock methods and passing a thread object or an array of thread objects.
This can be done independently of the status of the queue (full, empty, etc.), so it is superior to makeshift resolution mechanisms such as passing a custom message or dummy item into the queue, because there is no need to maintain flags or Mutex objects in your program to ensure that such an operation is not at risk of causing deadlock.
When a blocking thread receives the unblock signal, it will abandon its planned operation and raise UnblockError. As UnblockError inherits from StopIteration, it can be rescued in a variety of ways, including implicitly through Kernel.loop, allowing you to further simplify your program logic:
queue = ImprovedQueue.new 50
threads = []
# a consumer thread
threads << Thread.new do
loop do
val = queue.pop
do_something(val)
end
end
# another consumer
threads << Thread.new do
loop do
val = queue.pop
do_something(val)
end
end
# a producer
threads << Thread.new do
while not_finished_yet == true
queue << some_stuff
...
end
queue.unblock_deq [threads[0], threads[1]]
end
threads.each {|thr| thr.join }
One important thing to note: calling #unblock_deq is always safe (will never lose data), but when calling #unblock_enq, it is your responsibility to decide what is to be done with an item which failed to enqueue:
queue = ImprovedQueue.new 10
1.upto(11) do |x|
begin
queue << x
rescue ImprovedQueue::UnblockError
dont_lose_things(x)
end
end
Instance Attribute Summary collapse
-
#max ⇒ Object
Returns the current maximum size of the queue.
Instance Method Summary collapse
-
#clear ⇒ Object
Deletes all items from the queue.
-
#clear_blacklists(thrs = nil) ⇒ Object
Removes a thread or threads from both blacklists and allows them to block on either enqueue or dequeue.
-
#clear_deq(thrs = nil) ⇒ Object
Clear the dequeue blacklist or remove specified threads to allow them to block on dequeue once again:.
-
#clear_enq(thrs = nil) ⇒ Object
Clear the enqueue blacklist or remove specified threads to allow them to block on enqueue once again:.
-
#deq ⇒ Object
(also: #pop, #shift)
Removes and returns the next item from the queue.
-
#empty? ⇒ Boolean
Returns true if the queue is empty; false otherwise.
-
#enq(val) ⇒ Object
(also: #<<, #push)
Adds an item to the queue.
-
#initialize(size) ⇒ ImprovedSizedQueue
constructor
Returns a new instance of ImprovedQueue::ImprovedSizedQueue.
-
#length ⇒ Object
(also: #size)
Returns the total number of items currently in the queue.
-
#num_waiting ⇒ Object
Returns the number of threads currently waiting on the queue.
-
#reset ⇒ Object
Completely resets the queue: deletes all queue contents, raises UnblockError in any waiting threads, and clears all blacklists, such that all threads can once again block on enqueue or dequeue.
-
#unblock_deq(thrs = nil) ⇒ Object
Call this method to notify blocking threads that they should no longer wait to dequeue:.
-
#unblock_enq(thrs = nil) ⇒ Object
Call this method to notify blocking threads that they should no longer wait to enqueue:.
-
#unblock_waiting(thrs = nil) ⇒ Object
Stops a thread from blocking on either enqueue or dequeue.
Constructor Details
#initialize(size) ⇒ ImprovedSizedQueue
Returns a new instance of ImprovedQueue::ImprovedSizedQueue. Argument to ImprovedSizedQueue.new must be an integer value greater than 1.
137 138 139 140 141 142 143 144 145 146 147 |
# File 'lib/improved-queue.rb', line 137 def initialize(size) raise TypeError unless size.is_a? Integer and size > 1 @que = [] @max = size @mutex = Mutex.new @kill_enq = [] @kill_deq = [] @deq_waiting = [] @enq_waiting = [] @unblock = UnblockError.new end |
Instance Attribute Details
#max ⇒ Object
Returns the current maximum size of the queue.
152 153 154 |
# File 'lib/improved-queue.rb', line 152 def max @max end |
Instance Method Details
#clear ⇒ Object
Deletes all items from the queue. Threads waiting to dequeue will continue waiting; threads waiting to enqueue will be allowed to proceed (until the queue is full again).
353 354 355 356 357 358 |
# File 'lib/improved-queue.rb', line 353 def clear @mutex.synchronize do @que.clear enqueue_multiple_waiting end end |
#clear_blacklists(thrs = nil) ⇒ Object
Removes a thread or threads from both blacklists and allows them to block on either enqueue or dequeue. Equivalent to calling:
queue.clear_deq thread1
queue.clear_enq thread1
318 319 320 321 322 323 |
# File 'lib/improved-queue.rb', line 318 def clear_blacklists(thrs=nil) @mutex.synchronize do enq_allow thrs deq_allow thrs end end |
#clear_deq(thrs = nil) ⇒ Object
Clear the dequeue blacklist or remove specified threads to allow them to block on dequeue once again:
queue.clear_deq([thread1, thread2, ...])
queue.clear_deq thread3
queue.clear_deq # allow all threads to block on dequeue
Note: raises a TypeError if you attempt to remove specific threads from the blacklist after unblocking all. i.e. this is okay:
queue.unblock_deq [thread1, thread2]
queue.clear_deq thread2
… but this is not:
queue.unblock_deq
queue.clear_deq thread2
307 308 309 |
# File 'lib/improved-queue.rb', line 307 def clear_deq(thrs=nil) @mutex.synchronize { deq_allow thrs } end |
#clear_enq(thrs = nil) ⇒ Object
Clear the enqueue blacklist or remove specified threads to allow them to block on enqueue once again:
queue.clear_enq([thread1, thread2, ...])
queue.clear_enq thread3
queue.clear_enq # allow all threads to block on enqueue
Note: raises a TypeError if you attempt to remove specific threads from the blacklist after unblocking all. i.e. this is okay:
queue.unblock_enq [thread1, thread2]
queue.clear_enq thread2
… but this is not:
queue.unblock_enq
queue.clear_enq thread2
284 285 286 |
# File 'lib/improved-queue.rb', line 284 def clear_enq(thrs=nil) @mutex.synchronize { enq_allow thrs } end |
#deq ⇒ Object Also known as: pop, shift
Removes and returns the next item from the queue. Will block if the queue is empty, until an item becomes available to dequeue, or until #unblock_deq is called, in which case it will abandon the dequeue and raise UnblockError.
160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 |
# File 'lib/improved-queue.rb', line 160 def deq @mutex.lock if @que.size > 0 val = @que.shift if next_waiting = @enq_waiting.shift @que << next_waiting[:value] next_waiting[:pipe] << true end @mutex.unlock else if @kill_deq == true or @kill_deq.include? Thread.current @mutex.unlock raise UnblockError end pipe = Queue.new @deq_waiting << {thread: Thread.current, pipe: pipe} @mutex.unlock val = pipe.pop raise UnblockError if @unblock.equal? val end val end |
#empty? ⇒ Boolean
Returns true if the queue is empty; false otherwise.
337 338 339 |
# File 'lib/improved-queue.rb', line 337 def empty? @mutex.synchronize { @que.size == 0 } end |
#enq(val) ⇒ Object Also known as: <<, push
Adds an item to the queue. Will block if the queue is full, until an item is removed from the queue, or until #unblock_enq is called, in which case it will abandon the enqueue and raise an UnblockError. Important: If data integrity is important, you will need to include any code to deal with the orphaned item in your rescue clause.
193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 |
# File 'lib/improved-queue.rb', line 193 def enq(val) @mutex.lock if @que.size < @max if next_waiting = @deq_waiting.shift next_waiting[:pipe] << val else @que << val end @mutex.unlock else if @kill_enq == true or @kill_enq.include? Thread.current @mutex.unlock raise UnblockError end pipe = Queue.new @enq_waiting << {thread: Thread.current, pipe: pipe, value: val} @mutex.unlock answer = pipe.pop raise UnblockError if @unblock.equal? answer end nil end |
#length ⇒ Object Also known as: size
Returns the total number of items currently in the queue.
328 329 330 |
# File 'lib/improved-queue.rb', line 328 def length @mutex.synchronize { @que.size } end |
#num_waiting ⇒ Object
Returns the number of threads currently waiting on the queue.
344 345 346 |
# File 'lib/improved-queue.rb', line 344 def num_waiting @mutex.synchronize { @enq_waiting.size + @deq_waiting.size } end |
#reset ⇒ Object
Completely resets the queue: deletes all queue contents, raises UnblockError in any waiting threads, and clears all blacklists, such that all threads can once again block on enqueue or dequeue.
365 366 367 368 369 370 371 372 373 |
# File 'lib/improved-queue.rb', line 365 def reset @mutex.synchronize do @que.clear @kill_enq = [] @kill_deq = [] @enq_waiting.each {|waiting| waiting[:pipe] << @unblock }.clear @deq_waiting.each {|waiting| waiting[:pipe] << @unblock }.clear end end |
#unblock_deq(thrs = nil) ⇒ Object
Call this method to notify blocking threads that they should no longer wait to dequeue:
queue.unblock_deq([thread1, thread2, ...])
queue.unblock_deq thread3
queue.unblock_deq # unblock all threads
Note that the effect is permanent; once a thread has been unblocked, it is blacklisted and can no longer wait on this queue. To clear the dequeue blacklist, see the #clear_blacklists and #clear_deq methods.
247 248 249 |
# File 'lib/improved-queue.rb', line 247 def unblock_deq(thrs=nil) @mutex.synchronize { deq_shutdown thrs } end |
#unblock_enq(thrs = nil) ⇒ Object
Call this method to notify blocking threads that they should no longer wait to enqueue:
queue.unblock_enq([thread1, thread2, ...])
queue.unblock_enq thread3
queue.unblock_enq # unblock all threads
Note that the effect is permanent; once a thread has been unblocked, it is blacklisted and can no longer wait on this queue. To clear the enqueue blacklist, see the #clear_blacklists and #clear_enq methods.
231 232 233 |
# File 'lib/improved-queue.rb', line 231 def unblock_enq(thrs=nil) @mutex.synchronize { enq_shutdown thrs } end |
#unblock_waiting(thrs = nil) ⇒ Object
Stops a thread from blocking on either enqueue or dequeue. Equivalent to calling:
queue.unblock_deq thread1
queue.unblock_enq thread1
258 259 260 261 262 263 |
# File 'lib/improved-queue.rb', line 258 def unblock_waiting(thrs=nil) @mutex.synchronize do enq_shutdown thrs deq_shutdown thrs end end |