Class: AMQP::Exchange
- Inherits:
-
AMQ::Client::Exchange
- Object
- AMQ::Client::Exchange
- AMQP::Exchange
- Defined in:
- lib/amqp/exchange.rb
Overview
Please make sure you read a section on exchanges durability vs. messages persistence.
What are AMQP exchanges?
AMQP exchange is where AMQP clients send messages. AMQP exchange may also be described as a router or a matcher. Every published message is received by an exchange which, depending on its type and message attributes, determines how to deliver the message.
Entities that forward messages to consumers (or consumers fetch messages from on demand) are called queues. Exchanges are associated with queues via bindings. Roughly speaking, bindings determine messages placed in what exchange end up in what queues.
AMQP bindings
Closely related to exchange is a concept of bindings. A binding is the relationship between an exchange and a message queue that tells the exchange how to route messages. Bindings are set up by AMQP applications (usually the app owning and using the message queue sets up bindings for it). Exchange may be bound to none, 1 or more than 1 queue.
Exchange types
There are 4 supported exchange types: direct, fanout, topic and headers. Exchange type determines how exchange processes and routes messages.
Direct exchanges
Direct exchanges are useful for 1:1 communication scenarios. Queues are bound to direct exchanges with a parameter called “routing key”. When messages arrive to a direct exchange, broker takes that message’s routing key (if any), finds a queue bound to the exchange with the same routing key and routes message there.
Because very often queues are bound with the same routing key as queue’s name, AMQP 0.9.1 has a pre-declared direct exchange known as default exchange. Default exchange is a bit special: broker automatically binds all the queues (in the same virtual host) to it with routing key equal to queue names. In other words, messages delivered to default exchange are routed to queues when message routing key equals queue name. Default exchange name is an empty string.
As part of the standard, the server must predeclare the direct exchange ‘amq.direct’ and the fanout exchange ‘amq.fanout’ (all exchange names starting with ‘amq.’ are reserved). Attempts to declare an exchange using ‘amq.’ as the name will raise an AMQP::Error and fail. In practice these default exchanges are never used directly by client code.
Fanout exchanges
Fanout exchanges are useful for 1:n and n:m communication where one or more producer feeds multiple consumers. messages published to a fanout exchange are delivered to queues that are bound to that exchange name (unconditionally). Each queue gets it’s own copy of the message.
Topic exchanges
Topic exchanges are used for 1:n and n:m communication scenarios. Exchange of this type uses the routing key to determine which queues to deliver the message. Wildcard matching is allowed. The topic must be declared using dot notation to separate each subtopic.
As part of the AMQP standard, each server should predeclare a topic exchange called ‘amq.topic’.
The classic example is delivering market data. When publishing market data for stocks, we may subdivide the stream based on 2 characteristics: nation code and trading symbol. The topic tree for Apple may look like stock.us.aapl. NASDAQ updates may use topic stocks.us.nasdaq, while DAX may use stock.de.dax.
When publishing data to the exchange, bound queues subscribing to the exchange indicate which data interests them by passing a routing key for matching against the published routing key.
Headers exchanges
When publishing data to exchange of type headers, bound queues subscribing to the exchange indicate which data interests them by passing arguments for matching against the headers in published messages. The form of the matching can be controlled by the ‘x-match’ argument, which may be ‘any’ or ‘all’. If unspecified, it defaults to “all”.
A value of ‘all’ for ‘x-match’ implies that all values must match (i.e. it does an AND of the headers ), while a value of ‘any’ implies that at least one should match (ie. it does an OR).
As part of the AMQP standard, each server should predeclare a headers exchange named ‘amq.match’.
Key methods
Key methods of Exchange class are
Exchange durability and persistence of messages.
Learn more in our Durability guide.
RabbitMQ extensions.
AMQP gem supports several RabbitMQ extensions taht extend Exchange functionality. Learn more in VendorSpecificExtensions
Direct Known Subclasses
Constant Summary collapse
- DEFAULT_CONTENT_TYPE =
API
"application/octet-stream".freeze
Instance Attribute Summary collapse
- #default_routing_key ⇒ String (also: #key) readonly
- #name ⇒ String readonly
-
#on_declare ⇒ #call
A callback that is executed once declaration notification (exchange.declare-ok) from the broker arrives.
-
#opts ⇒ Hash
Options hash this exchange instance was instantiated with.
- #status ⇒ Symbol readonly
-
#type ⇒ Symbol
readonly
Type of this exchange (one of: :direct, :fanout, :topic, :headers).
Class Method Summary collapse
-
.default(channel = nil) ⇒ Exchange
The default exchange.
Instance Method Summary collapse
-
#auto_deleted? ⇒ Boolean
(also: #auto_deletable?)
True if this exchange is automatically deleted when it is no longer used.
- #callback ⇒ #call deprecated Deprecated.
- #channel ⇒ Channel
-
#delete(opts = {}, &block) ⇒ NilClass
This method deletes an exchange.
-
#durable? ⇒ Boolean
True if this exchange is durable.
-
#initialize(channel, type, name, opts = {}) {|exchange, declare_ok| ... } ⇒ Exchange
constructor
See Exchange class documentation for introduction, information about exchange types, what uses cases they are good for and so on.
-
#publish(payload, options = {}, &block) ⇒ Exchange
Publishes message to the exchange.
-
#reset ⇒ Object
Resets queue state.
-
#transient? ⇒ Boolean
(also: #temporary?)
True if this exchange is transient (non-durable).
Constructor Details
#initialize(channel, type, name, opts = {}) {|exchange, declare_ok| ... } ⇒ Exchange
See Exchange class documentation for introduction, information about exchange types, what uses cases they are good for and so on.
Predeclared exchanges
If exchange name corresponds to one of those predeclared by AMQP 0.9.1 specification (empty string, amq.direct, amq.fanout, amq.topic, amq.match), declaration command won’t be sent to the broker (because the only possible reply from the broker is to reject it, predefined entities cannot be changed). Callback, if any, will be executed immediately.
299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 |
# File 'lib/amqp/exchange.rb', line 299 def initialize(channel, type, name, opts = {}, &block) @channel = channel @type = type @opts = self.class.(type, name, opts, block) @default_routing_key = opts[:routing_key] || opts[:key] @name = name unless name.empty? @status = :unknown @default_publish_options = (opts.delete(:default_publish_options) || { :routing_key => AMQ::Protocol::EMPTY_STRING, :mandatory => false, :immediate => false }).freeze @default_headers = (opts.delete(:default_headers) || { :content_type => DEFAULT_CONTENT_TYPE, :persistent => false, :priority => 0 }).freeze super(channel.connection, channel, name, type) # The AMQP 0.8 specification (as well as 0.9.1) in 1.1.4.2 mentiones # that Exchange.Declare-Ok confirms the name of the exchange (because # of automaticallynamed), which is logical to interpret that this # functionality should be the same as for Queue (though it isn't # explicitely told in the specification). In fact, RabbitMQ (and # probably other implementations as well) doesn't support it and # there is a default exchange with an empty name (so-called default # or nameless exchange), so if we'd send Exchange.Declare(exchange=""), # then RabbitMQ interpret it as if we'd try to redefine this default # exchange so it'd produce an error. unless name == "amq.#{type}" or name.empty? or opts[:no_declare] @status = :opening unless @opts[:no_declare] @channel.once_open do if block shim = Proc.new do |exchange, declare_ok| case block.arity when 1 then block.call(exchange) else block.call(exchange, declare_ok) end end self.declare(passive = @opts[:passive], durable = @opts[:durable], auto_delete = @opts[:auto_delete], nowait = @opts[:nowait], @opts[:arguments], &shim) else self.declare(passive = @opts[:passive], durable = @opts[:durable], auto_delete = @opts[:auto_delete], nowait = @opts[:nowait], @opts[:arguments]) end end end else # Call the callback immediately, as given exchange is already # declared. @status = :opened block.call(self) if block end @on_declare = block end |
Instance Attribute Details
#default_routing_key ⇒ String (readonly) Also known as: key
195 196 197 |
# File 'lib/amqp/exchange.rb', line 195 def default_routing_key @default_routing_key end |
#name ⇒ String (readonly)
176 177 178 |
# File 'lib/amqp/exchange.rb', line 176 def name @name end |
#on_declare ⇒ #call
Returns A callback that is executed once declaration notification (exchange.declare-ok) from the broker arrives.
192 193 194 |
# File 'lib/amqp/exchange.rb', line 192 def on_declare @on_declare end |
#opts ⇒ Hash
Options hash this exchange instance was instantiated with
188 189 190 |
# File 'lib/amqp/exchange.rb', line 188 def opts @opts end |
#status ⇒ Symbol (readonly)
184 185 186 |
# File 'lib/amqp/exchange.rb', line 184 def status @status end |
#type ⇒ Symbol (readonly)
Type of this exchange (one of: :direct, :fanout, :topic, :headers).
180 181 182 |
# File 'lib/amqp/exchange.rb', line 180 def type @type end |
Class Method Details
.default(channel = nil) ⇒ Exchange
Do not confuse default exchange with amq.direct: amq.direct is a pre-defined direct exchange that doesn’t have any special routing semantics.
The default exchange. Default exchange is a direct exchange that is predefined. It cannot be removed. Every queue is bind to this (direct) exchange by default with the following routing semantics: messages will be routed to the queue withe same same name as message’s routing key. In other words, if a message is published with a routing key of “weather.usa.ca.sandiego” and there is a queue Q with this name, that message will be routed to Q.
170 171 172 |
# File 'lib/amqp/exchange.rb', line 170 def self.default(channel = nil) self.new(channel || AMQP::Channel.new, :direct, AMQ::Protocol::EMPTY_STRING, :no_declare => true) end |
Instance Method Details
#auto_deleted? ⇒ Boolean Also known as: auto_deletable?
Returns true if this exchange is automatically deleted when it is no longer used.
539 540 541 |
# File 'lib/amqp/exchange.rb', line 539 def auto_deleted? !!@opts[:auto_delete] end |
#callback ⇒ #call
Compatibility alias for #on_declare.
203 204 205 |
# File 'lib/amqp/exchange.rb', line 203 def callback @on_declare end |
#delete(opts = {}, &block) ⇒ NilClass
This method deletes an exchange. When an exchange is deleted all queue bindings on the exchange are deleted, too. Further attempts to publish messages to a deleted exchange will result in a channel-level exception.
511 512 513 514 515 516 517 518 |
# File 'lib/amqp/exchange.rb', line 511 def delete(opts = {}, &block) @channel.once_open do super(opts.fetch(:if_unused, false), opts.fetch(:nowait, false), &block) end # backwards compatibility nil end |
#durable? ⇒ Boolean
Please make sure you read Exchange class documentation section on exchanges durability vs. messages persistence.
Returns true if this exchange is durable.
524 525 526 |
# File 'lib/amqp/exchange.rb', line 524 def durable? !!@opts[:durable] end |
#publish(payload, options = {}, &block) ⇒ Exchange
Optional callback this method takes DOES NOT OFFER ANY GUARANTEES ABOUT DATA DELIVERY and must not be used as a “delivery callback”. The only way to guarantee delivery in distributed environment is to use an acknowledgement mechanism, such as AMQP transactions or lightweight “publisher confirms” RabbitMQ extension supported by amqp gem. See Durability and message persistence and Working With Exchanges guides for details.
Please make sure you read Durability an message persistence guide that covers exchanges durability vs. messages persistence.
Publishes message to the exchange. The message will be routed to queues by the exchange and distributed to any active consumers. Routing logic is determined by exchange type and configuration as well as message attributes (like :routing_key or message headers).
Published data is opaque and not modified by Ruby amqp gem in any way. Serialization of data with JSON, Thrift, BSON or similar libraries before publishing is very common.
Data serialization
Note that this method calls #to_s on payload argument value. Applications are encouraged of data serialization before publishing (using JSON, Thrift, Protocol Buffers or other serialization library). Note that because AMQP is a binary protocol, text formats like JSON largely lose their strong point of being easy to inspect as data travels across network, so BSON may be a good fit.
Publishing and message persistence
In cases when you application cannot afford to lose a message, AMQP 0.9.1 has several features to offer:
- Persistent messages
- Messages acknowledgements
- Transactions
- (a RabbitMQ-specific extension) Publisher confirms
This is a broad topic and we dedicate a separate guide, Durability and message persistence, to it.
Publishing callback and guarantees it DOES NOT offer
Exact moment when message is published is not determined and depends on many factors, including machine’s networking stack configuration, so (optional) block this method takes is scheduled for next event loop tick, and data is staged for delivery for current event loop tick. For most applications, this is good enough. The only way to guarantee a message was delivered in a distributed system is to ask a peer to send you a message back. RabbitMQ
Event loop blocking
When intermixing publishing of many messages with other workload that may take some time, even loop blocking may affect the performance. There are several ways to avoid it:
- Run EventMachine in a separate thread.
- Use EventMachine.next_tick.
- Use EventMachine.defer to offload operation to EventMachine thread pool.
TBD: this subject is worth a separate guide
Sending one-off messages
If you need to send a one-off message and then stop the event loop, pass a block to #publish that will be executed after message is pushed down the network stack, and use Session#disconnect to properly tear down AMQP connection (see example under Examples section below).
476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 |
# File 'lib/amqp/exchange.rb', line 476 def publish(payload, = {}, &block) opts = @default_publish_options.merge() @channel.once_open do properties = @default_headers.merge() properties[:delivery_mode] = properties.delete(:persistent) ? 2 : 1 super(payload.to_s, opts[:key] || opts[:routing_key] || @default_routing_key, properties, opts[:mandatory], opts[:immediate]) # don't pass block to AMQ::Client::Exchange#publish because it will be executed # immediately and we want to do it later. See ruby-amqp/amqp/#67 MK. EventMachine.next_tick(&block) if block end self end |
#reset ⇒ Object
Resets queue state. Useful for error handling.
546 547 548 |
# File 'lib/amqp/exchange.rb', line 546 def reset initialize(@channel, @type, @name, @opts) end |
#transient? ⇒ Boolean Also known as: temporary?
Please make sure you read Exchange class documentation section on exchanges durability vs. messages persistence.
Returns true if this exchange is transient (non-durable).
532 533 534 |
# File 'lib/amqp/exchange.rb', line 532 def transient? !self.durable? end |