Class: Hoodoo::Communicators::Pool::QueueWithTimeout
- Inherits:
-
Object
- Object
- Hoodoo::Communicators::Pool::QueueWithTimeout
- Defined in:
- lib/hoodoo/communicators/pool.rb
Overview
Internal implementation detail of Hoodoo::Communicators::Pool.
Since pool clients can say “wait until (one or all) workers have processed their Queue contents”, we need to have some way of seeing when all work is done. The clean way to do it is to push ‘sync now’ messages onto the communicator Threads work Queues, so that as they work through the Queue they’ll eventually reach that message. They then push a message onto a sync Queue for that worker. Meanwhile the waiting pool does (e.g.) a pop
on the sync Queue, which means it blocks until the workers say they’ve finished. No busy waiting, Ruby gets to make its best guess at scheduling, etc.; all good.
The catch? You can’t use Timeout::timeout...do...
around a Queue pop
. It just doesn’t work. It’s a strange omission and requires code gymnastics to work around.
Enter QueueWithTimeout, from:
http://spin.atomicobject.com/2014/07/07/ruby-queue-pop-timeout/
Instance Method Summary collapse
-
#<<(entry) ⇒ Object
Push a new entry to the end of the queue.
-
#initialize ⇒ QueueWithTimeout
constructor
Create a new instance.
-
#shift(timeout = nil) ⇒ Object
Take an entry from the front of the queue (FIFO) with optional timeout if the queue is empty.
Constructor Details
#initialize ⇒ QueueWithTimeout
Create a new instance.
494 495 496 497 498 |
# File 'lib/hoodoo/communicators/pool.rb', line 494 def initialize @mutex = ::Mutex.new @queue = [] @recieved = ::ConditionVariable.new end |
Instance Method Details
#<<(entry) ⇒ Object
Push a new entry to the end of the queue.
entry
-
Entry to put onto the end of the queue.
504 505 506 507 508 509 |
# File 'lib/hoodoo/communicators/pool.rb', line 504 def <<( entry ) @mutex.synchronize do @queue << entry @recieved.signal end end |
#shift(timeout = nil) ⇒ Object
Take an entry from the front of the queue (FIFO) with optional timeout if the queue is empty.
timeout
-
Timeout (in seconds, Integer or Float) to wait for an item to appear on the queue, if the queue is empty. If
nil
, there is no timeout (waits indefinitely). Optional; default isnil
.
If given a non-nil
timeout value and the timeout expires, raises a ThreadError exception (just as non-blocking Ruby Queue#pop would).
522 523 524 525 526 527 528 529 530 531 |
# File 'lib/hoodoo/communicators/pool.rb', line 522 def shift( timeout = nil ) @mutex.synchronize do if @queue.empty? @recieved.wait( @mutex, timeout ) if timeout != 0 raise( ThreadError, 'queue empty' ) if @queue.empty? end @queue.shift end end |