Class: Marconi::Exchange

Inherits:
Object
  • Object
show all
Defined in:
lib/marconi/exchange.rb

Constant Summary collapse

DEFAULT_PUBLISH_OPTIONS =
{
  # Tells the server how to react if the message cannot be routed to a queue.
  # If set to true, the server will return an unroutable message with a Return method.
  # If set to false, the server silently drops the message.
  :mandatory => true,

  # Tells the server how to react if the message cannot be routed to a
  # queue consumer immediately.  If set to true, the server will return an
  # undeliverable message with a Return method.  If set to false, the
  # server will queue the message, but with no guarantee that it will ever
  # be consumed.
  :immediate => false,

  # Tells the server whether to persist the message.  If set to true, the
  # message will be persisted to disk and not lost if the server restarts.
  # Setting to true incurs a performance penalty as there is an extra cost
  # associated with disk access.
  :persistent => true
}

Instance Method Summary collapse

Constructor Details

#initialize(exchange_name) ⇒ Exchange

Returns a new instance of Exchange.



6
7
8
# File 'lib/marconi/exchange.rb', line 6

def initialize(exchange_name)
  @exchange_name = exchange_name
end

Instance Method Details

#bunny_paramsObject



22
23
24
# File 'lib/marconi/exchange.rb', line 22

def bunny_params
  Marconi.config.bunny_params
end

#exchange_nameObject



10
11
12
# File 'lib/marconi/exchange.rb', line 10

def exchange_name
  @exchange_name
end

#keepaliveObject



18
19
20
# File 'lib/marconi/exchange.rb', line 18

def keepalive
  Marconi.config.keepalive
end

#nameObject



14
15
16
# File 'lib/marconi/exchange.rb', line 14

def name
  Marconi.config.name
end

#nuke_q(q_name) ⇒ Object

Example: Marconi.inbound.nuke_q(‘foo_q’) Use judiciously - this tosses all messages in the Q and nukes it



95
96
97
# File 'lib/marconi/exchange.rb', line 95

def nuke_q(q_name)
  generic_nuke(:queue, q_name) 
end

#pop(q_name, options = {}) ⇒ Object

Example: Marconi.inbound.pop(‘foo_q’, :key => ‘deals.member.*’)



73
74
75
76
77
# File 'lib/marconi/exchange.rb', line 73

def pop(q_name, options = {})
  q, key = get_q(q_name, options)
  msg = q.pop[:payload]
  msg == :queue_empty ? nil : msg
end

#publish(msg, options = {}) ⇒ Object

Example: Marconi.inbound.publish(“Howdy!”, :topic => ‘deals.member.create’)



47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
# File 'lib/marconi/exchange.rb', line 47

def publish(msg, options = {})
  topic = ensure_valid_publish_topic(options)
  begin
    connect
    @exchange.publish(msg, DEFAULT_PUBLISH_OPTIONS.merge(:key => topic))
    retmsg = @bunny.returned_message
    raise "Invalid return payload: #{retmsg[:payload]}" unless retmsg[:payload] == :no_return
    true
  rescue Exception => e
    Marconi.log(e)
    if Marconi.backup_queue_class && !options[:recovering]
      Marconi.backup_queue_class.create!(:exchange_name => exchange_name,
                                         :topic => topic,
                                         :body => msg)
    end
    false
  end
end

#purge_q(q_name) ⇒ Object

Example: Marconi.inbound.purge_q(‘foo_q’) Use judiciously - this tosses all messages in the Q!



81
82
83
84
85
86
87
88
89
90
91
# File 'lib/marconi/exchange.rb', line 81

def purge_q(q_name)
  connect
  unless q_name.blank?
    if o = exists?(:queue, q_name)
      o.purge
    end
  end
rescue Bunny::ForcedChannelCloseError
  connect(true) # Connection is fucked after this error, so it must be refreshed
  raise
end

#subscribe(q_name, options = {}, &block) ⇒ Object

Example: Marconi.inbound.subscribe(‘foo_q’, :key => ‘deals.member.*’) { |msg| puts msg }



67
68
69
70
# File 'lib/marconi/exchange.rb', line 67

def subscribe(q_name, options = {}, &block)
  q, key = get_q(q_name, options)
  q.subscribe(options, &block)
end