Class: Resque::MultiQueue

Inherits:
Object
  • Object
show all
Includes:
Mutex_m
Defined in:
lib/resque/multi_queue.rb

Overview

Holds multiple queues, allowing you to pop the first available job

Instance Method Summary collapse

Constructor Details

#initialize(queues, redis) ⇒ MultiQueue

Create a new MultiQueue using the queues from the redis connection



14
15
16
17
18
19
20
21
22
23
24
25
26
# File 'lib/resque/multi_queue.rb', line 14

def initialize(queues, redis)
  super()

  @queues     = queues # since ruby 1.8 doesn't have Ordered Hashes
  @queue_hash = {}
  @redis      = redis

  queues.each do |queue|
    key = @redis.is_a?(Redis::Namespace) ? "#{@redis.namespace}:" : ""
    key += queue.redis_name
    @queue_hash[key] = queue
  end
end

Instance Method Details

#poll(timeout) ⇒ Object

Retrieves data from the queue head, and removes it.

Blocks for timeout seconds if the queue is empty, and returns nil if the timeout expires.



62
63
64
65
66
67
68
69
70
71
# File 'lib/resque/multi_queue.rb', line 62

def poll(timeout)
  queue_names = @queues.map {|queue| queue.redis_name }
  queue_name, payload = @redis.blpop(*(queue_names + [timeout]))
  return unless payload

  synchronize do
    queue = @queue_hash[queue_name]
    [queue, queue.decode(payload)]
  end
end

#pop(non_block = false) ⇒ Object

Pop an item off one of the queues. This method will block until an item is available. This method returns a tuple of the queue object and job.

Pass true for a non-blocking pop. If nothing is read on a non-blocking pop, a ThreadError is raised.



33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
# File 'lib/resque/multi_queue.rb', line 33

def pop(non_block = false)
  if non_block
    synchronize do
      value = nil

      @queues.each do |queue|
        begin
          return [queue, queue.pop(true)]
        rescue ThreadError
        end
      end

      raise ThreadError
    end
  else
    queue_names = @queues.map {|queue| queue.redis_name }
    synchronize do
      value = @redis.blpop(*(queue_names + [1])) until value
      queue_name, payload = value
      queue = @queue_hash[queue_name]
      [queue, queue.decode(payload)]
    end
  end
end