Class: AMQP::Exchange
- Inherits:
-
Object
- Object
- AMQP::Exchange
- Extended by:
- ProtocolMethodHandlers
- Includes:
- Entity
- Defined in:
- lib/amqp/exchange.rb
Overview
Please make sure you read a section on exchanges durability vs. messages persistence.
What are AMQP exchanges?
AMQP exchange is where AMQP clients send messages. AMQP exchange may also be described as a router or a matcher. Every published message is received by an exchange which, depending on its type and message attributes, determines how to deliver the message.
Entities that forward messages to consumers (or consumers fetch messages from on demand) are called queues. Exchanges are associated with queues via bindings. Roughly speaking, bindings determine messages placed in what exchange end up in what queues.
AMQP bindings
Closely related to exchange is a concept of bindings. A binding is the relationship between an exchange and a message queue that tells the exchange how to route messages. Bindings are set up by AMQP applications (usually the app owning and using the message queue sets up bindings for it). Exchange may be bound to none, 1 or more than 1 queue.
Exchange types
There are 4 supported exchange types: direct, fanout, topic and headers. Exchange type determines how exchange processes and routes messages.
Direct exchanges
Direct exchanges are useful for 1:1 communication scenarios. Queues are bound to direct exchanges with a parameter called “routing key”. When messages arrive to a direct exchange, broker takes that message’s routing key (if any), finds a queue bound to the exchange with the same routing key and routes message there.
Because very often queues are bound with the same routing key as queue’s name, AMQP 0.9.1 has a pre-declared direct exchange known as default exchange. Default exchange is a bit special: broker automatically binds all the queues (in the same virtual host) to it with routing key equal to queue names. In other words, messages delivered to default exchange are routed to queues when message routing key equals queue name. Default exchange name is an empty string.
As part of the standard, the server must predeclare the direct exchange ‘amq.direct’ and the fanout exchange ‘amq.fanout’ (all exchange names starting with ‘amq.’ are reserved). Attempts to declare an exchange using ‘amq.’ as the name will result in a channel-level exception and fail. In practice these default exchanges are never used directly by client code.
Fanout exchanges
Fanout exchanges are useful for 1:n and n:m communication where one or more producer feeds multiple consumers. messages published to a fanout exchange are delivered to queues that are bound to that exchange name (unconditionally). Each queue gets it’s own copy of the message.
Topic exchanges
Topic exchanges are used for 1:n and n:m communication scenarios. Exchange of this type uses the routing key to determine which queues to deliver the message. Wildcard matching is allowed. The topic must be declared using dot notation to separate each subtopic.
As part of the AMQP standard, each server should predeclare a topic exchange called ‘amq.topic’.
The classic example is delivering market data. When publishing market data for stocks, we may subdivide the stream based on 2 characteristics: nation code and trading symbol. The topic tree for Apple may look like stock.us.aapl. NASDAQ updates may use topic stocks.us.nasdaq, while DAX may use stock.de.dax.
When publishing data to the exchange, bound queues subscribing to the exchange indicate which data interests them by passing a routing key for matching against the published routing key.
Headers exchanges
When publishing data to exchange of type headers, bound queues subscribing to the exchange indicate which data interests them by passing arguments for matching against the headers in published messages. The form of the matching can be controlled by the ‘x-match’ argument, which may be ‘any’ or ‘all’. If unspecified, it defaults to “all”.
A value of ‘all’ for ‘x-match’ implies that all values must match (i.e. it does an AND of the headers ), while a value of ‘any’ implies that at least one should match (ie. it does an OR).
As part of the AMQP standard, each server should predeclare a headers exchange named ‘amq.match’.
Key methods
Key methods of Exchange class are
Exchange durability and persistence of messages.
Learn more in our Durability guide.
RabbitMQ extensions.
AMQP gem supports several RabbitMQ extensions taht extend Exchange functionality. Learn more in VendorSpecificExtensions
Constant Summary collapse
- DEFAULT_CONTENT_TYPE =
API
"application/octet-stream".freeze
- BUILTIN_TYPES =
[:fanout, :direct, :topic, :headers].freeze
Constants included from Openable
Instance Attribute Summary collapse
-
#arguments ⇒ Hash
readonly
Additional arguments given on queue declaration.
-
#channel ⇒ Object
readonly
Channel this exchange belongs to.
- #default_routing_key ⇒ String (also: #key) readonly
- #name ⇒ String readonly
-
#on_declare ⇒ #call
A callback that is executed once declaration notification (exchange.declare-ok) from the broker arrives.
-
#opts ⇒ Hash
Options hash this exchange instance was instantiated with.
- #status ⇒ Symbol readonly
-
#type ⇒ Symbol
readonly
Type of this exchange (one of: :direct, :fanout, :topic, :headers).
Attributes included from Entity
Declaration collapse
- #exchange_declare(passive = false, durable = false, auto_delete = false, internal = false, nowait = false, arguments = nil, &block) ⇒ Object
- #redeclare(&block) ⇒ Object
Exchange to Exchange Bindings collapse
Publishing Messages collapse
- #basic_publish(payload, routing_key = AMQ::Protocol::EMPTY_STRING, user_headers = {}, mandatory = false) ⇒ Object
- #on_return(&block) ⇒ Object
Error Handling and Recovery collapse
-
#auto_recover ⇒ Object
Called by associated connection object when AMQP connection has been re-established (for example, after a network failure).
-
#before_recovery(&block) ⇒ Object
Defines a callback that will be executed after TCP connection is recovered after a network failure but before AMQP connection is re-opened.
-
#on_connection_interruption(&block) ⇒ Object
(also: #after_connection_interruption)
Defines a callback that will be executed after TCP connection is interrupted (typically because of a network failure).
-
#on_recovery(&block) ⇒ Object
(also: #after_recovery)
Defines a callback that will be executed when AMQP connection is recovered after a network failure..
Class Method Summary collapse
-
.default(channel = nil) ⇒ Exchange
The default exchange.
Instance Method Summary collapse
-
#auto_deleted? ⇒ Boolean
(also: #auto_deletable?)
True if this exchange is automatically deleted when it is no longer used.
- #callback ⇒ #call deprecated Deprecated.
-
#custom_type? ⇒ Boolean
True if this exchange is of a custom type (begins with x-).
-
#delete(opts = {}, &block) ⇒ NilClass
This method deletes an exchange.
-
#direct? ⇒ Boolean
True if this exchange is of type `direct`.
-
#durable? ⇒ Boolean
True if this exchange is durable.
- #exchange_delete(if_unused = false, nowait = false, &block) ⇒ Object
-
#fanout? ⇒ Boolean
True if this exchange is of type `fanout`.
- #handle_bind_ok(method) ⇒ Object
- #handle_declare_ok(method) ⇒ Object
- #handle_delete_ok(method) ⇒ Object
- #handle_unbind_ok(method) ⇒ Object
-
#headers? ⇒ Boolean
True if this exchange is of type `headers`.
-
#initialize(channel, type, name, opts = {}) {|exchange, declare_ok| ... } ⇒ Exchange
constructor
See Exchange class documentation for introduction, information about exchange types, what uses cases they are good for and so on.
-
#internal? ⇒ Boolean
True if this exchange is an internal exchange.
-
#predefined? ⇒ Boolean
True if this exchange is a pre-defined one (amq.direct, amq.fanout, amq.match and so on).
-
#publish(payload, options = {}, &block) ⇒ Exchange
Publishes message to the exchange.
-
#reset ⇒ Object
Resets queue state.
-
#topic? ⇒ Boolean
True if this exchange is of type `topic`.
-
#transient? ⇒ Boolean
(also: #temporary?)
True if this exchange is transient (non-durable).
Methods included from ProtocolMethodHandlers
Methods included from Callbacks
#clear_callbacks, #define_callback, #exec_callback, #exec_callback_once, #exec_callback_once_yielding_self, #exec_callback_yielding_self, #has_callback?, #prepend_callback, #redefine_callback
Methods included from Openable
#closed!, #closed?, #closing!, #closing?, #opened!, #opened?, #opening!, #opening?
Constructor Details
#initialize(channel, type, name, opts = {}) {|exchange, declare_ok| ... } ⇒ Exchange
See Exchange class documentation for introduction, information about exchange types, what uses cases they are good for and so on.
Predeclared exchanges
If exchange name corresponds to one of those predeclared by AMQP 0.9.1 specification (empty string, amq.direct, amq.fanout, amq.topic, amq.match), declaration command won’t be sent to the broker (because the only possible reply from the broker is to reject it, predefined entities cannot be changed). Callback, if any, will be executed immediately.
310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 |
# File 'lib/amqp/exchange.rb', line 310 def initialize(channel, type, name, opts = {}, &block) @channel = channel @type = type @opts = self.class.(type, name, opts, block) @default_routing_key = opts[:routing_key] || opts[:key] || AMQ::Protocol::EMPTY_STRING @name = name unless name.empty? @status = :unknown @default_publish_options = (opts.delete(:default_publish_options) || { :routing_key => @default_routing_key, :mandatory => false }).freeze @default_headers = (opts.delete(:default_headers) || { :content_type => DEFAULT_CONTENT_TYPE, :persistent => false, :priority => 0 }).freeze if !(BUILTIN_TYPES.include?(type.to_sym) || type.to_s =~ /^x-.+/i) raise UnknownExchangeTypeError.new(BUILTIN_TYPES, type) end @channel = channel @name = name @type = type # register pre-declared exchanges if @name == AMQ::Protocol::EMPTY_STRING || @name =~ /^amq\.(direct|fanout|topic|match|headers)/ @channel.register_exchange(self) end super(channel.connection) # The AMQP 0.8 specification (as well as 0.9.1) in 1.1.4.2 mentiones # that Exchange.Declare-Ok confirms the name of the exchange (because # of automaticallynamed), which is logical to interpret that this # functionality should be the same as for Queue (though it isn't # explicitely told in the specification). In fact, RabbitMQ (and # probably other implementations as well) doesn't support it and # there is a default exchange with an empty name (so-called default # or nameless exchange), so if we'd send Exchange.Declare(exchange=""), # then RabbitMQ interpret it as if we'd try to redefine this default # exchange so it'd produce an error. unless name == "amq.#{type}" or name.empty? or opts[:no_declare] @status = :opening unless @opts[:no_declare] @channel.once_open do if block shim = Proc.new do |exchange, declare_ok| case block.arity when 1 then block.call(exchange) else block.call(exchange, declare_ok) end end self.exchange_declare(@opts[:passive], @opts[:durable], @opts[:auto_delete], @opts[:internal], @opts[:nowait], @opts[:arguments], &shim) else self.exchange_declare(@opts[:passive], @opts[:durable], @opts[:auto_delete], @opts[:internal], @opts[:nowait], @opts[:arguments]) end end end else # Call the callback immediately, as given exchange is already # declared. @status = :opened block.call(self) if block end @on_declare = block end |
Instance Attribute Details
#arguments ⇒ Hash (readonly)
Returns Additional arguments given on queue declaration. Typically used by AMQP extensions.
206 207 208 |
# File 'lib/amqp/exchange.rb', line 206 def arguments @arguments end |
#channel ⇒ Object (readonly)
Channel this exchange belongs to.
203 204 205 |
# File 'lib/amqp/exchange.rb', line 203 def channel @channel end |
#default_routing_key ⇒ String (readonly) Also known as: key
210 211 212 |
# File 'lib/amqp/exchange.rb', line 210 def default_routing_key @default_routing_key end |
#name ⇒ String (readonly)
184 185 186 |
# File 'lib/amqp/exchange.rb', line 184 def name @name end |
#on_declare ⇒ #call
Returns A callback that is executed once declaration notification (exchange.declare-ok) from the broker arrives.
200 201 202 |
# File 'lib/amqp/exchange.rb', line 200 def on_declare @on_declare end |
#opts ⇒ Hash
Options hash this exchange instance was instantiated with
196 197 198 |
# File 'lib/amqp/exchange.rb', line 196 def opts @opts end |
#status ⇒ Symbol (readonly)
192 193 194 |
# File 'lib/amqp/exchange.rb', line 192 def status @status end |
#type ⇒ Symbol (readonly)
Type of this exchange (one of: :direct, :fanout, :topic, :headers).
188 189 190 |
# File 'lib/amqp/exchange.rb', line 188 def type @type end |
Class Method Details
.default(channel = nil) ⇒ Exchange
Do not confuse default exchange with amq.direct: amq.direct is a pre-defined direct exchange that doesn’t have any special routing semantics.
The default exchange. Default exchange is a direct exchange that is predefined. It cannot be removed. Every queue is bind to this (direct) exchange by default with the following routing semantics: messages will be routed to the queue withe same same name as message’s routing key. In other words, if a message is published with a routing key of “weather.usa.ca.sandiego” and there is a queue Q with this name, that message will be routed to Q.
178 179 180 |
# File 'lib/amqp/exchange.rb', line 178 def self.default(channel = nil) self.new(channel || AMQP::Channel.new, :direct, AMQ::Protocol::EMPTY_STRING, :no_declare => true) end |
Instance Method Details
#auto_deleted? ⇒ Boolean Also known as: auto_deletable?
Returns true if this exchange is automatically deleted when it is no longer used.
590 591 592 |
# File 'lib/amqp/exchange.rb', line 590 def auto_deleted? !!@opts[:auto_delete] end |
#auto_recover ⇒ Object
Called by associated connection object when AMQP connection has been re-established (for example, after a network failure).
838 839 840 |
# File 'lib/amqp/exchange.rb', line 838 def auto_recover self.redeclare unless predefined? end |
#basic_publish(payload, routing_key = AMQ::Protocol::EMPTY_STRING, user_headers = {}, mandatory = false) ⇒ Object
764 765 766 767 768 769 770 771 |
# File 'lib/amqp/exchange.rb', line 764 def basic_publish(payload, routing_key = AMQ::Protocol::EMPTY_STRING, user_headers = {}, mandatory = false) headers = { :priority => 0, :delivery_mode => 2, :content_type => "application/octet-stream" }.merge(user_headers) @connection.send_frameset(AMQ::Protocol::Basic::Publish.encode(@channel.id, payload, headers, @name, routing_key, mandatory, false, @connection.frame_max), @channel) # publisher confirms support. MK. @channel.exec_callback(:after_publish) self end |
#before_recovery(&block) ⇒ Object
Defines a callback that will be executed after TCP connection is recovered after a network failure but before AMQP connection is re-opened. Only one callback can be defined (the one defined last replaces previously added ones).
809 810 811 |
# File 'lib/amqp/exchange.rb', line 809 def before_recovery(&block) self.redefine_callback(:before_recovery, &block) end |
#bind(source, opts = {}, &block) ⇒ Object
694 695 696 697 698 699 700 701 702 703 704 705 706 707 |
# File 'lib/amqp/exchange.rb', line 694 def bind(source, opts = {}, &block) source = source.name if source.respond_to?(:name) routing_key = opts[:key] || opts[:routing_key] || AMQ::Protocol::EMPTY_STRING arguments = opts[:arguments] || {} nowait = opts[:nowait] || block.nil? @channel.once_open do @connection.send_frame(AMQ::Protocol::Exchange::Bind.encode(@channel.id, @name, source, routing_key, nowait, arguments)) unless nowait self.define_callback(:bind, &block) @channel.exchanges_awaiting_bind_ok.push(self) end end self end |
#callback ⇒ #call
Compatibility alias for #on_declare.
218 219 220 |
# File 'lib/amqp/exchange.rb', line 218 def callback @on_declare end |
#custom_type? ⇒ Boolean
Returns true if this exchange is of a custom type (begins with x-).
410 411 412 |
# File 'lib/amqp/exchange.rb', line 410 def custom_type? @type.to_s =~ /^x-.+/i end |
#delete(opts = {}, &block) ⇒ NilClass
This method deletes an exchange. When an exchange is deleted all queue bindings on the exchange are deleted, too. Further attempts to publish messages to a deleted exchange will result in a channel-level exception.
562 563 564 565 566 567 568 569 |
# File 'lib/amqp/exchange.rb', line 562 def delete(opts = {}, &block) @channel.once_open do exchange_delete(opts.fetch(:if_unused, false), opts.fetch(:nowait, false), &block) end # backwards compatibility nil end |
#direct? ⇒ Boolean
Returns true if this exchange is of type `direct`.
392 393 394 |
# File 'lib/amqp/exchange.rb', line 392 def direct? @type == :direct end |
#durable? ⇒ Boolean
Please make sure you read Exchange class documentation section on exchanges durability vs. messages persistence.
Returns true if this exchange is durable.
575 576 577 |
# File 'lib/amqp/exchange.rb', line 575 def durable? !!@opts[:durable] end |
#exchange_declare(passive = false, durable = false, auto_delete = false, internal = false, nowait = false, arguments = nil, &block) ⇒ Object
605 606 607 608 609 610 611 612 613 614 615 616 617 618 619 620 621 |
# File 'lib/amqp/exchange.rb', line 605 def exchange_declare(passive = false, durable = false, auto_delete = false, internal = false, nowait = false, arguments = nil, &block) # for re-declaration @passive = passive @durable = durable @auto_delete = auto_delete @arguments = arguments @internal = internal @connection.send_frame(AMQ::Protocol::Exchange::Declare.encode(@channel.id, @name, @type.to_s, passive, durable, auto_delete, internal, nowait, arguments)) unless nowait self.define_callback(:declare, &block) @channel.exchanges_awaiting_declare_ok.push(self) end self end |
#exchange_delete(if_unused = false, nowait = false, &block) ⇒ Object
641 642 643 644 645 646 647 648 649 650 651 652 |
# File 'lib/amqp/exchange.rb', line 641 def exchange_delete(if_unused = false, nowait = false, &block) @connection.send_frame(AMQ::Protocol::Exchange::Delete.encode(@channel.id, @name, if_unused, nowait)) unless nowait self.define_callback(:delete, &block) # TODO: delete itself from exchanges cache @channel.exchanges_awaiting_delete_ok.push(self) end self end |
#fanout? ⇒ Boolean
Returns true if this exchange is of type `fanout`.
386 387 388 |
# File 'lib/amqp/exchange.rb', line 386 def fanout? @type == :fanout end |
#handle_bind_ok(method) ⇒ Object
857 858 859 |
# File 'lib/amqp/exchange.rb', line 857 def handle_bind_ok(method) self.exec_callback_once(:bind, method) end |
#handle_declare_ok(method) ⇒ Object
851 852 853 854 855 |
# File 'lib/amqp/exchange.rb', line 851 def handle_declare_ok(method) @channel.register_exchange(self) self.exec_callback_once_yielding_self(:declare, method) end |
#handle_delete_ok(method) ⇒ Object
865 866 867 |
# File 'lib/amqp/exchange.rb', line 865 def handle_delete_ok(method) self.exec_callback_once(:delete, method) end |
#handle_unbind_ok(method) ⇒ Object
861 862 863 |
# File 'lib/amqp/exchange.rb', line 861 def handle_unbind_ok(method) self.exec_callback_once(:unbind, method) end |
#headers? ⇒ Boolean
Returns true if this exchange is of type `headers`.
404 405 406 |
# File 'lib/amqp/exchange.rb', line 404 def headers? @type == :headers end |
#internal? ⇒ Boolean
Returns true if this exchange is an internal exchange.
420 421 422 |
# File 'lib/amqp/exchange.rb', line 420 def internal? @opts[:internal] end |
#on_connection_interruption(&block) ⇒ Object Also known as: after_connection_interruption
Defines a callback that will be executed after TCP connection is interrupted (typically because of a network failure). Only one callback can be defined (the one defined last replaces previously added ones).
792 793 794 |
# File 'lib/amqp/exchange.rb', line 792 def on_connection_interruption(&block) self.redefine_callback(:after_connection_interruption, &block) end |
#on_recovery(&block) ⇒ Object Also known as: after_recovery
Defines a callback that will be executed when AMQP connection is recovered after a network failure.. Only one callback can be defined (the one defined last replaces previously added ones).
823 824 825 |
# File 'lib/amqp/exchange.rb', line 823 def on_recovery(&block) self.redefine_callback(:after_recovery, &block) end |
#on_return(&block) ⇒ Object
775 776 777 778 779 |
# File 'lib/amqp/exchange.rb', line 775 def on_return(&block) self.redefine_callback(:return, &block) self end |
#predefined? ⇒ Boolean
Returns true if this exchange is a pre-defined one (amq.direct, amq.fanout, amq.match and so on).
415 416 417 |
# File 'lib/amqp/exchange.rb', line 415 def predefined? @name && ((@name == AMQ::Protocol::EMPTY_STRING) || !!(@name =~ /^amq\.(direct|fanout|topic|headers|match)/i)) end |
#publish(payload, options = {}, &block) ⇒ Exchange
Optional callback this method takes DOES NOT OFFER ANY GUARANTEES ABOUT DATA DELIVERY and must not be used as a “delivery callback”. The only way to guarantee delivery in distributed environment is to use an acknowledgement mechanism, such as AMQP transactions or lightweight “publisher confirms” RabbitMQ extension supported by amqp gem. See Durability and message persistence and Working With Exchanges guides for details.
Please make sure you read Durability an message persistence guide that covers exchanges durability vs. messages persistence.
Publishes message to the exchange. The message will be routed to queues by the exchange and distributed to any active consumers. Routing logic is determined by exchange type and configuration as well as message attributes (like :routing_key or message headers).
Published data is opaque and not modified by Ruby amqp gem in any way. Serialization of data with JSON, Thrift, BSON or similar libraries before publishing is very common.
Data serialization
Note that this method calls #to_s on payload argument value. Applications are encouraged of data serialization before publishing (using JSON, Thrift, Protocol Buffers or other serialization library). Note that because AMQP is a binary protocol, text formats like JSON largely lose their strong point of being easy to inspect as data travels across network, so BSON may be a good fit.
Publishing and message persistence
In cases when you application cannot afford to lose a message, AMQP 0.9.1 has several features to offer:
- Persistent messages
- Messages acknowledgements
- Transactions
- (a RabbitMQ-specific extension) Publisher confirms
This is a broad topic and we dedicate a separate guide, Durability and message persistence, to it.
Publishing callback and guarantees it DOES NOT offer
Exact moment when message is published is not determined and depends on many factors, including machine’s networking stack configuration, so (optional) block this method takes is scheduled for next event loop tick, and data is staged for delivery for current event loop tick. For most applications, this is good enough. The only way to guarantee a message was delivered in a distributed system is to ask a peer to send you a message back. RabbitMQ
Event loop blocking
When intermixing publishing of many messages with other workload that may take some time, even loop blocking may affect the performance. There are several ways to avoid it:
- Run EventMachine in a separate thread.
- Use EventMachine.next_tick.
- Use EventMachine.defer to offload operation to EventMachine thread pool.
TBD: this subject is worth a separate guide
Sending one-off messages
If you need to send a one-off message and then stop the event loop, pass a block to #publish that will be executed after message is pushed down the network stack, and use Session#disconnect to properly tear down AMQP connection (see example under Examples section below).
527 528 529 530 531 532 533 534 535 536 537 538 539 540 541 |
# File 'lib/amqp/exchange.rb', line 527 def publish(payload, = {}, &block) opts = @default_publish_options.merge() @channel.once_open do properties = @default_headers.merge() properties[:delivery_mode] = properties.delete(:persistent) ? 2 : 1 basic_publish(payload.to_s, opts[:key] || opts[:routing_key] || @default_routing_key, properties, opts[:mandatory]) # don't pass block to AMQP::Exchange#publish because it will be executed # immediately and we want to do it later. See ruby-amqp/amqp/#67 MK. EventMachine.next_tick(&block) if block end self end |
#redeclare(&block) ⇒ Object
625 626 627 628 629 630 631 632 633 634 635 |
# File 'lib/amqp/exchange.rb', line 625 def redeclare(&block) nowait = block.nil? @connection.send_frame(AMQ::Protocol::Exchange::Declare.encode(@channel.id, @name, @type.to_s, @passive, @durable, @auto_delete, @internal, nowait, @arguments)) unless nowait self.define_callback(:declare, &block) @channel.exchanges_awaiting_declare_ok.push(self) end self end |
#reset ⇒ Object
Resets queue state. Useful for error handling.
597 598 599 |
# File 'lib/amqp/exchange.rb', line 597 def reset initialize(@channel, @type, @name, @opts) end |
#topic? ⇒ Boolean
Returns true if this exchange is of type `topic`.
398 399 400 |
# File 'lib/amqp/exchange.rb', line 398 def topic? @type == :topic end |
#transient? ⇒ Boolean Also known as: temporary?
Please make sure you read Exchange class documentation section on exchanges durability vs. messages persistence.
Returns true if this exchange is transient (non-durable).
583 584 585 |
# File 'lib/amqp/exchange.rb', line 583 def transient? !self.durable? end |
#unbind(source, opts = {}, &block) ⇒ Object
744 745 746 747 748 749 750 751 752 753 754 755 756 757 |
# File 'lib/amqp/exchange.rb', line 744 def unbind(source, opts = {}, &block) source = source.name if source.respond_to?(:name) routing_key = opts[:key] || opts[:routing_key] || AMQ::Protocol::EMPTY_STRING arguments = opts[:arguments] || {} nowait = opts[:nowait] || block.nil? @channel.once_open do @connection.send_frame(AMQ::Protocol::Exchange::Unbind.encode(@channel.id, @name, source, routing_key, nowait, arguments)) unless nowait self.define_callback(:unbind, &block) @channel.exchanges_awaiting_unbind_ok.push(self) end end self end |