Class: Marconi::Exchange
- Inherits:
-
Object
- Object
- Marconi::Exchange
- Defined in:
- lib/marconi/exchange.rb
Constant Summary collapse
- DEFAULT_PUBLISH_OPTIONS =
{ # Tells the server how to react if the message cannot be routed to a queue. # If set to true, the server will return an unroutable message with a Return method. # If set to false, the server silently drops the message. :mandatory => true, # Tells the server how to react if the message cannot be routed to a # queue consumer immediately. If set to true, the server will return an # undeliverable message with a Return method. If set to false, the # server will queue the message, but with no guarantee that it will ever # be consumed. :immediate => false, # Tells the server whether to persist the message. If set to true, the # message will be persisted to disk and not lost if the server restarts. # Setting to true incurs a performance penalty as there is an extra cost # associated with disk access. :persistent => true }
Instance Method Summary collapse
- #bunny_params ⇒ Object
- #exchange_name ⇒ Object
-
#initialize(exchange_name) ⇒ Exchange
constructor
A new instance of Exchange.
- #keepalive ⇒ Object
- #name ⇒ Object
-
#nuke_q(q_name) ⇒ Object
Example: Marconi.inbound.nuke_q(‘foo_q’) Use judiciously - this tosses all messages in the Q and nukes it.
-
#pop(q_name, options = {}) ⇒ Object
Example: Marconi.inbound.pop(‘foo_q’, :key => ‘deals.member.*’).
-
#publish(msg, options = {}) ⇒ Object
Example: Marconi.inbound.publish(“Howdy!”, :topic => ‘deals.member.create’).
-
#purge_q(q_name) ⇒ Object
Example: Marconi.inbound.purge_q(‘foo_q’) Use judiciously - this tosses all messages in the Q!.
-
#subscribe(q_name, options = {}, &block) ⇒ Object
Example: Marconi.inbound.subscribe(‘foo_q’, :key => ‘deals.member.*’) { |msg| puts msg }.
Constructor Details
#initialize(exchange_name) ⇒ Exchange
Returns a new instance of Exchange.
6 7 8 |
# File 'lib/marconi/exchange.rb', line 6 def initialize(exchange_name) @exchange_name = exchange_name end |
Instance Method Details
#bunny_params ⇒ Object
22 23 24 |
# File 'lib/marconi/exchange.rb', line 22 def bunny_params Marconi.config.bunny_params end |
#exchange_name ⇒ Object
10 11 12 |
# File 'lib/marconi/exchange.rb', line 10 def exchange_name @exchange_name end |
#keepalive ⇒ Object
18 19 20 |
# File 'lib/marconi/exchange.rb', line 18 def keepalive Marconi.config.keepalive end |
#nuke_q(q_name) ⇒ Object
Example: Marconi.inbound.nuke_q(‘foo_q’) Use judiciously - this tosses all messages in the Q and nukes it
95 96 97 |
# File 'lib/marconi/exchange.rb', line 95 def nuke_q(q_name) generic_nuke(:queue, q_name) end |
#pop(q_name, options = {}) ⇒ Object
Example: Marconi.inbound.pop(‘foo_q’, :key => ‘deals.member.*’)
73 74 75 76 77 |
# File 'lib/marconi/exchange.rb', line 73 def pop(q_name, = {}) q, key = get_q(q_name, ) msg = q.pop[:payload] msg == :queue_empty ? nil : msg end |
#publish(msg, options = {}) ⇒ Object
Example: Marconi.inbound.publish(“Howdy!”, :topic => ‘deals.member.create’)
47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 |
# File 'lib/marconi/exchange.rb', line 47 def publish(msg, = {}) topic = ensure_valid_publish_topic() begin connect @exchange.publish(msg, DEFAULT_PUBLISH_OPTIONS.merge(:key => topic)) retmsg = @bunny. raise "Invalid return payload: #{retmsg[:payload]}" unless retmsg[:payload] == :no_return true rescue Exception => e Marconi.log(e) if Marconi.backup_queue_class && ![:recovering] Marconi.backup_queue_class.create!(:exchange_name => exchange_name, :topic => topic, :body => msg) end false end end |
#purge_q(q_name) ⇒ Object
Example: Marconi.inbound.purge_q(‘foo_q’) Use judiciously - this tosses all messages in the Q!
81 82 83 84 85 86 87 88 89 90 91 |
# File 'lib/marconi/exchange.rb', line 81 def purge_q(q_name) connect unless q_name.blank? if o = exists?(:queue, q_name) o.purge end end rescue Bunny::ForcedChannelCloseError connect(true) # Connection is fucked after this error, so it must be refreshed raise end |
#subscribe(q_name, options = {}, &block) ⇒ Object
Example: Marconi.inbound.subscribe(‘foo_q’, :key => ‘deals.member.*’) { |msg| puts msg }
67 68 69 70 |
# File 'lib/marconi/exchange.rb', line 67 def subscribe(q_name, = {}, &block) q, key = get_q(q_name, ) q.subscribe(, &block) end |