Class: Resque::Plugins::Brokered::Broker
- Inherits:
-
Object
- Object
- Resque::Plugins::Brokered::Broker
- Defined in:
- lib/resque/plugins/brokered/broker.rb
Instance Method Summary collapse
- #available_queues ⇒ Object
- #filter_queues(queues) ⇒ Object
- #get_queue ⇒ Object
-
#initialize(redis, queues) ⇒ Broker
constructor
A new instance of Broker.
- #pop ⇒ Object
- #queues_regex ⇒ Object
Constructor Details
#initialize(redis, queues) ⇒ Broker
Returns a new instance of Broker.
5 6 7 8 |
# File 'lib/resque/plugins/brokered/broker.rb', line 5 def initialize redis, queues @redis = redis @queues = queues end |
Instance Method Details
#available_queues ⇒ Object
10 11 12 |
# File 'lib/resque/plugins/brokered/broker.rb', line 10 def available_queues @redis.sdiff :queues, :active_queues end |
#filter_queues(queues) ⇒ Object
14 15 16 |
# File 'lib/resque/plugins/brokered/broker.rb', line 14 def filter_queues queues queues.select {|q| queues_regex.match q} end |
#get_queue ⇒ Object
18 19 20 21 |
# File 'lib/resque/plugins/brokered/broker.rb', line 18 def get_queue queues = filter_queues(available_queues) queues.shuffle.detect {|name| @redis.llen("queue:#{name}") > 0 } end |
#pop ⇒ Object
27 28 29 30 31 32 33 34 35 36 37 38 |
# File 'lib/resque/plugins/brokered/broker.rb', line 27 def pop @redis.watch "#{@redis.namespace}:active_queues" return nil unless queue_name = get_queue @redis.multi @redis.sadd :active_queues, queue_name @redis.lpop "queue:#{queue_name}" add, value = @redis.exec [queue_name, Resque.decode(value)] ensure @redis.unwatch end |
#queues_regex ⇒ Object
23 24 25 |
# File 'lib/resque/plugins/brokered/broker.rb', line 23 def queues_regex /^(?:#{@queues.join('|')}).*/ end |