Class: Bunny::Exchange
- Inherits:
-
Object
- Object
- Bunny::Exchange
- Defined in:
- lib/bunny/exchange.rb
Overview
Represents AMQP 0.9.1 exchanges.
Instance Attribute Summary collapse
- #channel ⇒ Bunny::Channel readonly
- #name ⇒ String readonly
-
#opts ⇒ Hash
Options hash this exchange instance was instantiated with.
- #status ⇒ Symbol readonly
-
#type ⇒ Symbol
readonly
Type of this exchange (one of: :direct, :fanout, :topic, :headers).
Class Method Summary collapse
-
.default(channel_or_connection) ⇒ Exchange
The default exchange.
Instance Method Summary collapse
-
#arguments ⇒ Hash
Additional optional arguments (typically used by RabbitMQ extensions and plugins).
-
#auto_delete? ⇒ Boolean
True if this exchange was declared as automatically deleted (deleted as soon as last consumer unbinds).
-
#bind(source, opts = {}) ⇒ Bunny::Exchange
Binds an exchange to another (source) exchange using exchange.bind AMQP 0.9.1 extension that RabbitMQ provides.
-
#delete(opts = {}) ⇒ Object
Deletes the exchange unless it is predeclared.
-
#durable? ⇒ Boolean
True if this exchange was declared as durable (will survive broker restart).
- #handle_return(basic_return, properties, content) ⇒ Object
-
#initialize(channel, type, name, opts = {}) ⇒ Exchange
constructor
A new instance of Exchange.
-
#internal? ⇒ Boolean
True if this exchange is internal (used solely for exchange-to-exchange bindings and cannot be published to by clients).
-
#on_return(&block) ⇒ Object
Defines a block that will handle returned messages.
-
#predefined? ⇒ Boolean
(also: #predeclared?)
True if this exchange is a pre-defined one (amq.direct, amq.fanout, amq.match and so on).
-
#publish(payload, opts = {}) ⇒ Bunny::Exchange
Publishes a message.
- #recover_from_network_failure ⇒ Object
-
#unbind(source, opts = {}) ⇒ Bunny::Exchange
Unbinds an exchange from another (source) exchange using exchange.unbind AMQP 0.9.1 extension that RabbitMQ provides.
-
#wait_for_confirms ⇒ Object
Waits until all outstanding publisher confirms on the channel arrive.
Constructor Details
#initialize(channel, type, name, opts = {}) ⇒ Exchange
Returns a new instance of Exchange.
74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 |
# File 'lib/bunny/exchange.rb', line 74 def initialize(channel, type, name, opts = {}) @channel = channel @name = name @type = type @options = self.class.(name, opts) @durable = @options[:durable] @auto_delete = @options[:auto_delete] @internal = @options[:internal] @arguments = @options[:arguments] @bindings = Set.new declare! unless opts[:no_declare] || predeclared? || (@name == AMQ::Protocol::EMPTY_STRING) @channel.register_exchange(self) end |
Instance Attribute Details
#channel ⇒ Bunny::Channel (readonly)
15 16 17 |
# File 'lib/bunny/exchange.rb', line 15 def channel @channel end |
#name ⇒ String (readonly)
18 19 20 |
# File 'lib/bunny/exchange.rb', line 18 def name @name end |
#opts ⇒ Hash
Options hash this exchange instance was instantiated with
30 31 32 |
# File 'lib/bunny/exchange.rb', line 30 def opts @opts end |
#status ⇒ Symbol (readonly)
26 27 28 |
# File 'lib/bunny/exchange.rb', line 26 def status @status end |
#type ⇒ Symbol (readonly)
Type of this exchange (one of: :direct, :fanout, :topic, :headers).
22 23 24 |
# File 'lib/bunny/exchange.rb', line 22 def type @type end |
Class Method Details
.default(channel_or_connection) ⇒ Exchange
Do not confuse the default exchange with amq.direct: amq.direct is a pre-defined direct exchange that doesn’t have any special routing semantics.
The default exchange. This exchange is a direct exchange that is predefined by the broker and that cannot be removed. Every queue is bound to this exchange by default with the following routing semantics: messages will be routed to the queue with the same name as the message’s routing key. In other words, if a message is published with a routing key of “weather.usa.ca.sandiego” and there is a queue with this name, the message will be routed to the queue.
54 55 56 |
# File 'lib/bunny/exchange.rb', line 54 def self.default(channel_or_connection) self.new(channel_or_connection, :direct, AMQ::Protocol::EMPTY_STRING, :no_declare => true) end |
Instance Method Details
#arguments ⇒ Hash
Returns Additional optional arguments (typically used by RabbitMQ extensions and plugins).
112 113 114 |
# File 'lib/bunny/exchange.rb', line 112 def arguments @arguments end |
#auto_delete? ⇒ Boolean
Returns true if this exchange was declared as automatically deleted (deleted as soon as last consumer unbinds).
100 101 102 |
# File 'lib/bunny/exchange.rb', line 100 def auto_delete? @auto_delete end |
#bind(source, opts = {}) ⇒ Bunny::Exchange
Binds an exchange to another (source) exchange using exchange.bind AMQP 0.9.1 extension that RabbitMQ provides.
174 175 176 177 178 179 |
# File 'lib/bunny/exchange.rb', line 174 def bind(source, opts = {}) @channel.exchange_bind(source, self, opts) @bindings.add(source: source, opts: opts) self end |
#delete(opts = {}) ⇒ Object
Deletes the exchange unless it is predeclared
155 156 157 158 |
# File 'lib/bunny/exchange.rb', line 155 def delete(opts = {}) @channel.deregister_exchange(self) @channel.exchange_delete(@name, opts) unless predeclared? end |
#durable? ⇒ Boolean
Returns true if this exchange was declared as durable (will survive broker restart).
94 95 96 |
# File 'lib/bunny/exchange.rb', line 94 def durable? @durable end |
#handle_return(basic_return, properties, content) ⇒ Object
236 237 238 239 240 241 242 |
# File 'lib/bunny/exchange.rb', line 236 def handle_return(basic_return, properties, content) if @on_return @on_return.call(basic_return, properties, content) else # TODO: log a warning end end |
#internal? ⇒ Boolean
Returns true if this exchange is internal (used solely for exchange-to-exchange bindings and cannot be published to by clients).
106 107 108 |
# File 'lib/bunny/exchange.rb', line 106 def internal? @internal end |
#on_return(&block) ⇒ Object
Defines a block that will handle returned messages
205 206 207 208 209 |
# File 'lib/bunny/exchange.rb', line 205 def on_return(&block) @on_return = block self end |
#predefined? ⇒ Boolean Also known as: predeclared?
Returns true if this exchange is a pre-defined one (amq.direct, amq.fanout, amq.match and so on).
245 246 247 |
# File 'lib/bunny/exchange.rb', line 245 def predefined? (@name == AMQ::Protocol::EMPTY_STRING) || !!(@name =~ /^amq\.(direct|fanout|topic|headers|match)/i) end |
#publish(payload, opts = {}) ⇒ Bunny::Exchange
Publishes a message
140 141 142 143 144 |
# File 'lib/bunny/exchange.rb', line 140 def publish(payload, opts = {}) @channel.basic_publish(payload, self.name, (opts.delete(:routing_key) || opts.delete(:key)), opts) self end |
#recover_from_network_failure ⇒ Object
222 223 224 225 226 227 228 |
# File 'lib/bunny/exchange.rb', line 222 def recover_from_network_failure declare! unless @options[:no_declare] ||predefined? @bindings.each do |b| bind(b[:source], b[:opts]) end end |
#unbind(source, opts = {}) ⇒ Bunny::Exchange
Unbinds an exchange from another (source) exchange using exchange.unbind AMQP 0.9.1 extension that RabbitMQ provides.
195 196 197 198 199 200 |
# File 'lib/bunny/exchange.rb', line 195 def unbind(source, opts = {}) @channel.exchange_unbind(source, self, opts) @bindings.delete(source: source, opts: opts) self end |
#wait_for_confirms ⇒ Object
Waits until all outstanding publisher confirms on the channel arrive.
This is a convenience method that delegates to Channel#wait_for_confirms
217 218 219 |
# File 'lib/bunny/exchange.rb', line 217 def wait_for_confirms @channel.wait_for_confirms end |