Module: ResqueExtension
- Defined in:
- lib/resque-fanout/resque_extension.rb
Class Method Summary collapse
Instance Method Summary collapse
- #exchanges ⇒ Object
- #publish(exchange, *args) ⇒ Object
- #queues_for(exchange) ⇒ Object
- #subscribe(exchange, options = {}) ⇒ Object
- #unsubscribe(exchange, options = {}) ⇒ Object
Class Method Details
.apply ⇒ Object
3 4 5 |
# File 'lib/resque-fanout/resque_extension.rb', line 3 def self.apply Resque.extend ResqueExtension end |
Instance Method Details
#exchanges ⇒ Object
39 40 41 42 43 44 45 46 |
# File 'lib/resque-fanout/resque_extension.rb', line 39 def exchanges redis.smembers(:exchanges).map do |exchange| { :exchange => exchange, :queues => queues_for(exchange) } end end |
#publish(exchange, *args) ⇒ Object
49 50 51 52 53 |
# File 'lib/resque-fanout/resque_extension.rb', line 49 def publish(exchange, *args) (queues_for(exchange) || []).each do |queue| push queue[:queue], :class => queue[:class], :args => args end end |
#queues_for(exchange) ⇒ Object
28 29 30 31 32 33 34 35 36 37 |
# File 'lib/resque-fanout/resque_extension.rb', line 28 def queues_for(exchange) if redis.sismember(:exchanges, exchange.to_s) klasses = Hash[redis.hgetall("exchanges:class:#{exchange.to_s}")] redis.smembers("exchanges:#{exchange.to_s}").map do |queue| res = { :queue => queue } res[:class] = klasses[queue] if klasses[queue] res end end end |
#subscribe(exchange, options = {}) ⇒ Object
8 9 10 11 12 13 14 15 |
# File 'lib/resque-fanout/resque_extension.rb', line 8 def subscribe(exchange, ={}) raise ArgumentError, "either class or queue param must be supplier" unless [:queue] || [:class] queue = [:queue] || queue_from_class([:class]) redis.sadd("exchanges:#{exchange.to_s}", queue.to_s) redis.hset("exchanges:class:#{exchange.to_s}", queue.to_s, [:class].to_s) if [:class] redis.sadd(:exchanges, exchange.to_s) end |
#unsubscribe(exchange, options = {}) ⇒ Object
17 18 19 20 21 22 23 24 25 26 |
# File 'lib/resque-fanout/resque_extension.rb', line 17 def unsubscribe(exchange, ={}) raise ArgumentError, "either class or queue param must be supplier" unless [:queue] || [:class] queue = [:queue] || queue_from_class([:class]) redis.srem("exchanges:#{exchange.to_s}", queue.to_s) redis.hdel("exchanges:class:#{exchange.to_s}", queue.to_s) if redis.scard("exchanges:#{exchange.to_s}") == 0 redis.srem(:exchanges, exchange.to_s) end end |