Module: ResqueExtension

Defined in:
lib/resque-fanout/resque_extension.rb

Class Method Summary collapse

Instance Method Summary collapse

Class Method Details

.applyObject



3
4
5
# File 'lib/resque-fanout/resque_extension.rb', line 3

def self.apply
  Resque.extend ResqueExtension
end

Instance Method Details

#exchangesObject



39
40
41
42
43
44
45
46
# File 'lib/resque-fanout/resque_extension.rb', line 39

def exchanges
  redis.smembers(:exchanges).map do |exchange|
    {
      :exchange => exchange,
      :queues => queues_for(exchange)
    }
  end
end

#publish(exchange, *args) ⇒ Object



49
50
51
52
53
# File 'lib/resque-fanout/resque_extension.rb', line 49

def publish(exchange, *args)
  (queues_for(exchange) || []).each do |queue|
    push queue[:queue], :class => queue[:class], :args => args
  end
end

#queues_for(exchange) ⇒ Object



28
29
30
31
32
33
34
35
36
37
# File 'lib/resque-fanout/resque_extension.rb', line 28

def queues_for(exchange)
  if redis.sismember(:exchanges, exchange.to_s)
    klasses = Hash[redis.hgetall("exchanges:class:#{exchange.to_s}")]
    redis.smembers("exchanges:#{exchange.to_s}").map do |queue|
      res = { :queue => queue }
      res[:class] = klasses[queue] if klasses[queue]
      res
    end
  end
end

#subscribe(exchange, options = {}) ⇒ Object

Raises:

  • (ArgumentError)


8
9
10
11
12
13
14
15
# File 'lib/resque-fanout/resque_extension.rb', line 8

def subscribe(exchange, options={})
  raise ArgumentError, "either class or queue param must be supplier" unless options[:queue] || options[:class]

  queue = options[:queue] || queue_from_class(options[:class])
  redis.sadd("exchanges:#{exchange.to_s}", queue.to_s)
  redis.hset("exchanges:class:#{exchange.to_s}", queue.to_s, options[:class].to_s) if options[:class]
  redis.sadd(:exchanges, exchange.to_s)
end

#unsubscribe(exchange, options = {}) ⇒ Object

Raises:

  • (ArgumentError)


17
18
19
20
21
22
23
24
25
26
# File 'lib/resque-fanout/resque_extension.rb', line 17

def unsubscribe(exchange, options={})
  raise ArgumentError, "either class or queue param must be supplier" unless options[:queue] || options[:class]

  queue = options[:queue] || queue_from_class(options[:class])
  redis.srem("exchanges:#{exchange.to_s}", queue.to_s)
  redis.hdel("exchanges:class:#{exchange.to_s}", queue.to_s)
  if redis.scard("exchanges:#{exchange.to_s}") == 0
    redis.srem(:exchanges, exchange.to_s)
  end
end