Class: Resque::MultiQueue
- Inherits:
-
Object
- Object
- Resque::MultiQueue
- Includes:
- Mutex_m
- Defined in:
- lib/resque/multi_queue.rb
Overview
Holds multiple queues, allowing you to pop the first available job
Instance Method Summary collapse
-
#initialize(queues, redis) ⇒ MultiQueue
constructor
Create a new MultiQueue using the
queues
from theredis
connection. -
#poll(timeout) ⇒ Object
Retrieves data from the queue head, and removes it.
-
#pop(non_block = false) ⇒ Object
Pop an item off one of the queues.
Constructor Details
#initialize(queues, redis) ⇒ MultiQueue
Create a new MultiQueue using the queues
from the redis
connection
14 15 16 17 18 19 20 21 22 23 24 25 26 |
# File 'lib/resque/multi_queue.rb', line 14 def initialize(queues, redis) super() @queues = queues # since ruby 1.8 doesn't have Ordered Hashes @queue_hash = {} @redis = redis queues.each do |queue| key = @redis.is_a?(Redis::Namespace) ? "#{@redis.namespace}:" : "" key += queue.redis_name @queue_hash[key] = queue end end |
Instance Method Details
#poll(timeout) ⇒ Object
Retrieves data from the queue head, and removes it.
Blocks for timeout
seconds if the queue is empty, and returns nil if the timeout expires.
62 63 64 65 66 67 68 69 70 71 |
# File 'lib/resque/multi_queue.rb', line 62 def poll(timeout) queue_names = @queues.map {|queue| queue.redis_name } queue_name, payload = @redis.blpop(*(queue_names + [timeout])) return unless payload synchronize do queue = @queue_hash[queue_name] [queue, queue.decode(payload)] end end |
#pop(non_block = false) ⇒ Object
Pop an item off one of the queues. This method will block until an item is available. This method returns a tuple of the queue object and job.
Pass true
for a non-blocking pop. If nothing is read on a non-blocking pop, a ThreadError is raised.
33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 |
# File 'lib/resque/multi_queue.rb', line 33 def pop(non_block = false) if non_block synchronize do value = nil @queues.each do |queue| begin return [queue, queue.pop(true)] rescue ThreadError end end raise ThreadError end else queue_names = @queues.map {|queue| queue.redis_name } synchronize do value = @redis.blpop(*(queue_names + [1])) until value queue_name, payload = value queue = @queue_hash[queue_name] [queue, queue.decode(payload)] end end end |