Class: Ragnar::Exchange

Inherits:
Object
  • Object
show all
Defined in:
lib/ragnar/exchange.rb

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(type, name, opts = {}) ⇒ Exchange

Returns a new instance of Exchange.



7
8
9
# File 'lib/ragnar/exchange.rb', line 7

def initialize type, name, opts={}
  @type, @name, @options = type, name, opts
end

Instance Attribute Details

#channelObject (readonly)

Returns the value of attribute channel.



4
5
6
# File 'lib/ragnar/exchange.rb', line 4

def channel
  @channel
end

#exchangeObject (readonly)

Returns the value of attribute exchange.



4
5
6
# File 'lib/ragnar/exchange.rb', line 4

def exchange
  @exchange
end

#nameObject (readonly)

Returns the value of attribute name.



4
5
6
# File 'lib/ragnar/exchange.rb', line 4

def name
  @name
end

#optionsObject (readonly)

Returns the value of attribute options.



4
5
6
# File 'lib/ragnar/exchange.rb', line 4

def options
  @options
end

#queue_prefixObject

Returns the value of attribute queue_prefix.



5
6
7
# File 'lib/ragnar/exchange.rb', line 5

def queue_prefix
  @queue_prefix
end

#typeObject (readonly)

Returns the value of attribute type.



4
5
6
# File 'lib/ragnar/exchange.rb', line 4

def type
  @type
end

Instance Method Details

#publish(routing_key, message, opts = {}) ⇒ Object



11
12
13
14
15
16
17
# File 'lib/ragnar/exchange.rb', line 11

def publish routing_key, message, opts={}
  EM.schedule do
    channel, exchange = setup_connection
    channel.queue(@name).bind(exchange, opts.merge(:routing_key => routing_key))
    exchange.publish(message, opts.merge(:routing_key => routing_key))
  end
end

#subscribe(name, subscribe_opts = {}, &block) ⇒ Object

Takes a subscription key or queue/routing options

exchange.subscribe('the.key') # => queue name and routing key are 'the.key'
exchange.subscribe(:queue => 'my.queue', :routing_key => 'message.*.pattern')


24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
# File 'lib/ragnar/exchange.rb', line 24

def subscribe name, subscribe_opts={}, &block
  if name.is_a?(Hash)
    queue_name = name[:queue] or raise 'Invalid queue name'
    routing_key = name[:routing_key]
  else
    raise 'Invalid queue name' if name.nil? or name.strip.empty?
    queue_name = queue_prefix.nil? ? name : '%s.%s' % [queue_prefix, name]
    routing_key = name
  end

  EM.schedule do
    channel, exchange = setup_connection
    channel.queue(queue_name).bind(exchange, :routing_key => routing_key).subscribe(subscribe_opts, &block)
  end
end