Class: AMQP::Exchange

Inherits:
Object
  • Object
show all
Extended by:
ProtocolMethodHandlers
Includes:
Entity
Defined in:
lib/amqp/exchange.rb

Overview

Note:

Please make sure you read a section on exchanges durability vs. messages persistence.

What are AMQP exchanges?

AMQP exchange is where AMQP clients send messages. AMQP exchange may also be described as a router or a matcher. Every published message is received by an exchange which, depending on its type and message attributes, determines how to deliver the message.

Entities that forward messages to consumers (or consumers fetch messages from on demand) are called queues. Exchanges are associated with queues via bindings. Roughly speaking, bindings determine messages placed in what exchange end up in what queues.

AMQP bindings

Closely related to exchange is a concept of bindings. A binding is the relationship between an exchange and a message queue that tells the exchange how to route messages. Bindings are set up by AMQP applications (usually the app owning and using the message queue sets up bindings for it). Exchange may be bound to none, 1 or more than 1 queue.

Exchange types

There are 4 supported exchange types: direct, fanout, topic and headers. Exchange type determines how exchange processes and routes messages.

Direct exchanges

Direct exchanges are useful for 1:1 communication scenarios. Queues are bound to direct exchanges with a parameter called “routing key”. When messages arrive to a direct exchange, broker takes that message’s routing key (if any), finds a queue bound to the exchange with the same routing key and routes message there.

Because very often queues are bound with the same routing key as queue’s name, AMQP 0.9.1 has a pre-declared direct exchange known as default exchange. Default exchange is a bit special: broker automatically binds all the queues (in the same virtual host) to it with routing key equal to queue names. In other words, messages delivered to default exchange are routed to queues when message routing key equals queue name. Default exchange name is an empty string.

As part of the standard, the server must predeclare the direct exchange ‘amq.direct’ and the fanout exchange ‘amq.fanout’ (all exchange names starting with ‘amq.’ are reserved). Attempts to declare an exchange using ‘amq.’ as the name will result in a channel-level exception and fail. In practice these default exchanges are never used directly by client code.

Fanout exchanges

Fanout exchanges are useful for 1:n and n:m communication where one or more producer feeds multiple consumers. messages published to a fanout exchange are delivered to queues that are bound to that exchange name (unconditionally). Each queue gets it’s own copy of the message.

Topic exchanges

Topic exchanges are used for 1:n and n:m communication scenarios. Exchange of this type uses the routing key to determine which queues to deliver the message. Wildcard matching is allowed. The topic must be declared using dot notation to separate each subtopic.

As part of the AMQP standard, each server should predeclare a topic exchange called ‘amq.topic’.

The classic example is delivering market data. When publishing market data for stocks, we may subdivide the stream based on 2 characteristics: nation code and trading symbol. The topic tree for Apple may look like stock.us.aapl. NASDAQ updates may use topic stocks.us.nasdaq, while DAX may use stock.de.dax.

When publishing data to the exchange, bound queues subscribing to the exchange indicate which data interests them by passing a routing key for matching against the published routing key.

Headers exchanges

When publishing data to exchange of type headers, bound queues subscribing to the exchange indicate which data interests them by passing arguments for matching against the headers in published messages. The form of the matching can be controlled by the ‘x-match’ argument, which may be ‘any’ or ‘all’. If unspecified, it defaults to “all”.

A value of ‘all’ for ‘x-match’ implies that all values must match (i.e. it does an AND of the headers ), while a value of ‘any’ implies that at least one should match (ie. it does an OR).

As part of the AMQP standard, each server should predeclare a headers exchange named ‘amq.match’.

Key methods

Key methods of Exchange class are

Exchange durability and persistence of messages.

Learn more in our Durability guide.

RabbitMQ extensions.

AMQP gem supports several RabbitMQ extensions taht extend Exchange functionality. Learn more in VendorSpecificExtensions

Constant Summary collapse

DEFAULT_CONTENT_TYPE =

API

"application/octet-stream".freeze
BUILTIN_TYPES =
[:fanout, :direct, :topic, :headers].freeze

Constants included from Openable

Openable::VALUES

Instance Attribute Summary collapse

Attributes included from Entity

#callbacks

Declaration collapse

Exchange to Exchange Bindings collapse

Publishing Messages collapse

Error Handling and Recovery collapse

Class Method Summary collapse

Instance Method Summary collapse

Methods included from ProtocolMethodHandlers

handle, handlers

Methods included from Callbacks

#clear_callbacks, #define_callback, #exec_callback, #exec_callback_once, #exec_callback_once_yielding_self, #exec_callback_yielding_self, #has_callback?, #prepend_callback, #redefine_callback

Methods included from Openable

#closed!, #closed?, #closing!, #closing?, #opened!, #opened?, #opening!, #opening?

Constructor Details

#initialize(channel, type, name, opts = {}) {|exchange, declare_ok| ... } ⇒ Exchange

See Exchange class documentation for introduction, information about exchange types, what uses cases they are good for and so on.

Predeclared exchanges

If exchange name corresponds to one of those predeclared by AMQP 0.9.1 specification (empty string, amq.direct, amq.fanout, amq.topic, amq.match), declaration command won’t be sent to the broker (because the only possible reply from the broker is to reject it, predefined entities cannot be changed). Callback, if any, will be executed immediately.

Examples:

Instantiating a fanout exchange using constructor


AMQP.connect do |connection|
  AMQP::Channel.new(connection) do |channel|
    AMQP::Exchange.new(channel, :fanout, "search.index.updates") do |exchange, declare_ok|
      # by now exchange is ready and waiting
    end
  end
end

Instantiating a direct exchange using Channel#direct


AMQP.connect do |connection|
  AMQP::Channel.new(connection) do |channel|
    channel.direct("email.replies_listener") do |exchange, declare_ok|
      # by now exchange is ready and waiting
    end
  end
end

Parameters:

  • channel (Channel)

    AMQP channel this exchange is associated with

  • type (Symbol)

    Exchange type

  • name (String)

    Exchange name

  • opts (Hash) (defaults to: {})

    a customizable set of options

Options Hash (opts):

  • :passive (Boolean) — default: false

    If set, the server will not create the exchange if it does not already exist. The client can use this to check whether an exchange exists without modifying the server state.

  • :durable (Boolean) — default: false

    If set when creating a new exchange, the exchange will be marked as durable. Durable exchanges and their bindings are recreated upon a server restart (information about them is persisted). Non-durable (transient) exchanges do not survive if/when a server restarts (information about them is stored exclusively in RAM).

  • :auto_delete (Boolean) — default: false

    If set, the exchange is deleted when all queues have finished using it. The server waits for a short period of time before determining the exchange is unused to give time to the client code to bind a queue to it.

  • :internal (Boolean) — default: false

    If set, the exchange may not be used directly by publishers, but only when bound to other exchanges. Internal exchanges are used to construct wiring that is not visible to applications. This is a RabbitMQ-specific extension.

  • :nowait (Boolean) — default: true

    If set, the server will not respond to the method. The client should not wait for a reply method. If the server could not complete the method it will raise a channel or connection exception.

  • :no_declare (Boolean) — default: true

    If set, exchange declaration command won’t be sent to the broker. Allows to forcefully avoid declaration. We recommend that only experienced developers consider this option.

  • :default_routing_key (String) — default: nil

    Default routing key that will be used by #publish when no routing key is not passed explicitly. It is perfectly fine for applications to always specify routing key to #publish.

  • :arguments (Hash) — default: nil

    A hash of optional arguments with the declaration. Some brokers implement AMQP extensions using x-prefixed declaration arguments.

Yields:

  • (exchange, declare_ok)

    Yields successfully declared exchange instance and AMQP method (exchange.declare-ok) instance. The latter is optional.

Yield Parameters:

  • exchange (Exchange)

    Exchange that is successfully declared and is ready to be used.

  • declare_ok (AMQP::Protocol::Exchange::DeclareOk)

    AMQP exchange.declare-ok) instance.

See Also:



310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
# File 'lib/amqp/exchange.rb', line 310

def initialize(channel, type, name, opts = {}, &block)
  @channel             = channel
  @type                = type
  @opts                = self.class.add_default_options(type, name, opts, block)
  @default_routing_key = opts[:routing_key] || opts[:key] || AMQ::Protocol::EMPTY_STRING
  @name                = name unless name.empty?

  @status                  = :unknown
  @default_publish_options = (opts.delete(:default_publish_options) || {
      :routing_key  => @default_routing_key,
      :mandatory    => false
    }).freeze

  @default_headers = (opts.delete(:default_headers) || {
      :content_type => DEFAULT_CONTENT_TYPE,
      :persistent   => false,
      :priority     => 0
    }).freeze

  if !(BUILTIN_TYPES.include?(type.to_sym) || type.to_s =~ /^x-.+/i)
    raise UnknownExchangeTypeError.new(BUILTIN_TYPES, type)
  end

  @channel    = channel
  @name       = name
  @type       = type

  # register pre-declared exchanges
  if @name == AMQ::Protocol::EMPTY_STRING || @name =~ /^amq\.(direct|fanout|topic|match|headers)/
    @channel.register_exchange(self)
  end

  super(channel.connection)

  # The AMQP 0.8 specification (as well as 0.9.1) in 1.1.4.2 mentiones
  # that Exchange.Declare-Ok confirms the name of the exchange (because
  # of automatically­named), which is logical to interpret that this
  # functionality should be the same as for Queue (though it isn't
  # explicitely told in the specification). In fact, RabbitMQ (and
  # probably other implementations as well) doesn't support it and
  # there is a default exchange with an empty name (so-called default
  # or nameless exchange), so if we'd send Exchange.Declare(exchange=""),
  # then RabbitMQ interpret it as if we'd try to redefine this default
  # exchange so it'd produce an error.
  unless name == "amq.#{type}" or name.empty? or opts[:no_declare]
    @status = :opening

    unless @opts[:no_declare]
      @channel.once_open do
        if block
          shim = Proc.new do |exchange, declare_ok|
            case block.arity
            when 1 then block.call(exchange)
            else
              block.call(exchange, declare_ok)
            end
          end

          self.exchange_declare(@opts[:passive], @opts[:durable], @opts[:auto_delete], @opts[:internal], @opts[:nowait], @opts[:arguments], &shim)
        else
          self.exchange_declare(@opts[:passive], @opts[:durable], @opts[:auto_delete], @opts[:internal], @opts[:nowait], @opts[:arguments])
        end
      end
    end
  else
    # Call the callback immediately, as given exchange is already
    # declared.
    @status = :opened
    block.call(self) if block
  end

  @on_declare = block
end

Instance Attribute Details

#argumentsHash (readonly)

Returns Additional arguments given on queue declaration. Typically used by AMQP extensions.

Returns:

  • (Hash)

    Additional arguments given on queue declaration. Typically used by AMQP extensions.



206
207
208
# File 'lib/amqp/exchange.rb', line 206

def arguments
  @arguments
end

#channelObject (readonly)

Channel this exchange belongs to.



203
204
205
# File 'lib/amqp/exchange.rb', line 203

def channel
  @channel
end

#default_routing_keyString (readonly) Also known as: key

Returns:

  • (String)


210
211
212
# File 'lib/amqp/exchange.rb', line 210

def default_routing_key
  @default_routing_key
end

#nameString (readonly)

Returns:

  • (String)


184
185
186
# File 'lib/amqp/exchange.rb', line 184

def name
  @name
end

#on_declare#call

Returns A callback that is executed once declaration notification (exchange.declare-ok) from the broker arrives.

Returns:

  • (#call)

    A callback that is executed once declaration notification (exchange.declare-ok) from the broker arrives.



200
201
202
# File 'lib/amqp/exchange.rb', line 200

def on_declare
  @on_declare
end

#optsHash

Options hash this exchange instance was instantiated with

Returns:

  • (Hash)


196
197
198
# File 'lib/amqp/exchange.rb', line 196

def opts
  @opts
end

#statusSymbol (readonly)

Returns:

  • (Symbol)


192
193
194
# File 'lib/amqp/exchange.rb', line 192

def status
  @status
end

#typeSymbol (readonly)

Type of this exchange (one of: :direct, :fanout, :topic, :headers).

Returns:

  • (Symbol)


188
189
190
# File 'lib/amqp/exchange.rb', line 188

def type
  @type
end

Class Method Details

.default(channel = nil) ⇒ Exchange

Note:

Do not confuse default exchange with amq.direct: amq.direct is a pre-defined direct exchange that doesn’t have any special routing semantics.

The default exchange. Default exchange is a direct exchange that is predefined. It cannot be removed. Every queue is bind to this (direct) exchange by default with the following routing semantics: messages will be routed to the queue withe same same name as message’s routing key. In other words, if a message is published with a routing key of “weather.usa.ca.sandiego” and there is a queue Q with this name, that message will be routed to Q.

Examples:

Publishing a messages to the tasks queue

channel     = AMQP::Channel.new(connection)
tasks_queue = channel.queue("tasks")
AMQP::Exchange.default(channel).publish("make clean", routing_key => "tasks")

Parameters:

  • channel (AMQP::Channel) (defaults to: nil)

    Channel to use. If not given, new AMQP channel will be opened on the default AMQP connection (accessible as AMQP.connection).

Returns:

  • (Exchange)

    An instance that corresponds to the default exchange (of type direct).

See Also:



178
179
180
# File 'lib/amqp/exchange.rb', line 178

def self.default(channel = nil)
  self.new(channel || AMQP::Channel.new, :direct, AMQ::Protocol::EMPTY_STRING, :no_declare => true)
end

Instance Method Details

#auto_deleted?Boolean Also known as: auto_deletable?

Returns true if this exchange is automatically deleted when it is no longer used.

Returns:

  • (Boolean)

    true if this exchange is automatically deleted when it is no longer used



590
591
592
# File 'lib/amqp/exchange.rb', line 590

def auto_deleted?
  !!@opts[:auto_delete]
end

#auto_recoverObject

Called by associated connection object when AMQP connection has been re-established (for example, after a network failure).



838
839
840
# File 'lib/amqp/exchange.rb', line 838

def auto_recover
  self.redeclare unless predefined?
end

#basic_publish(payload, routing_key = AMQ::Protocol::EMPTY_STRING, user_headers = {}, mandatory = false) ⇒ Object



764
765
766
767
768
769
770
771
# File 'lib/amqp/exchange.rb', line 764

def basic_publish(payload, routing_key = AMQ::Protocol::EMPTY_STRING, user_headers = {}, mandatory = false)
  headers = { :priority => 0, :delivery_mode => 2, :content_type => "application/octet-stream" }.merge(user_headers)
  @connection.send_frameset(AMQ::Protocol::Basic::Publish.encode(@channel.id, payload, headers, @name, routing_key, mandatory, false, @connection.frame_max), @channel)

  # publisher confirms support. MK.
  @channel.exec_callback(:after_publish)
  self
end

#before_recovery(&block) ⇒ Object

Defines a callback that will be executed after TCP connection is recovered after a network failure but before AMQP connection is re-opened. Only one callback can be defined (the one defined last replaces previously added ones).



809
810
811
# File 'lib/amqp/exchange.rb', line 809

def before_recovery(&block)
  self.redefine_callback(:before_recovery, &block)
end

#bind(source, opts = {}, &block) ⇒ Object



694
695
696
697
698
699
700
701
702
703
704
705
706
707
# File 'lib/amqp/exchange.rb', line 694

def bind(source, opts = {}, &block)
  source = source.name if source.respond_to?(:name)
  routing_key = opts[:key] || opts[:routing_key] || AMQ::Protocol::EMPTY_STRING
  arguments = opts[:arguments] || {}
  nowait = opts[:nowait] || block.nil?
  @channel.once_open do
    @connection.send_frame(AMQ::Protocol::Exchange::Bind.encode(@channel.id, @name, source, routing_key, nowait, arguments))
    unless nowait
      self.define_callback(:bind, &block)
      @channel.exchanges_awaiting_bind_ok.push(self)
    end
  end
  self
end

#callback#call

Deprecated.

Compatibility alias for #on_declare.

Returns:

  • (#call)


218
219
220
# File 'lib/amqp/exchange.rb', line 218

def callback
  @on_declare
end

#custom_type?Boolean

Returns true if this exchange is of a custom type (begins with x-).

Returns:

  • (Boolean)

    true if this exchange is of a custom type (begins with x-)



410
411
412
# File 'lib/amqp/exchange.rb', line 410

def custom_type?
  @type.to_s =~ /^x-.+/i
end

#delete(opts = {}, &block) ⇒ NilClass

This method deletes an exchange. When an exchange is deleted all queue bindings on the exchange are deleted, too. Further attempts to publish messages to a deleted exchange will result in a channel-level exception.

Examples:

Deleting an exchange


exchange = AMQP::Channel.direct("search.indexing")
exchange.delete

Parameters:

  • opts (Hash) (defaults to: {})

    a customizable set of options

Options Hash (opts):

  • :nowait (Boolean) — default: false

    If set, the server will not respond to the method. The client should not wait for a reply method. If the server could not complete the method it will raise a channel or connection exception.

  • :if_unused (Boolean) — default: false

    If set, the server will only delete the exchange if it has no queue bindings. If the exchange has queue bindings the server does not delete it but raises a channel exception instead.

Returns:

  • (NilClass)

    nil



562
563
564
565
566
567
568
569
# File 'lib/amqp/exchange.rb', line 562

def delete(opts = {}, &block)
  @channel.once_open do
    exchange_delete(opts.fetch(:if_unused, false), opts.fetch(:nowait, false), &block)
  end

  # backwards compatibility
  nil
end

#direct?Boolean

Returns true if this exchange is of type `direct`.

Returns:

  • (Boolean)

    true if this exchange is of type `direct`



392
393
394
# File 'lib/amqp/exchange.rb', line 392

def direct?
  @type == :direct
end

#durable?Boolean

Note:

Please make sure you read Exchange class documentation section on exchanges durability vs. messages persistence.

Returns true if this exchange is durable.

Returns:

  • (Boolean)

    true if this exchange is durable



575
576
577
# File 'lib/amqp/exchange.rb', line 575

def durable?
  !!@opts[:durable]
end

#exchange_declare(passive = false, durable = false, auto_delete = false, internal = false, nowait = false, arguments = nil, &block) ⇒ Object



605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
# File 'lib/amqp/exchange.rb', line 605

def exchange_declare(passive = false, durable = false, auto_delete = false, internal = false, nowait = false, arguments = nil, &block)
  # for re-declaration
  @passive     = passive
  @durable     = durable
  @auto_delete = auto_delete
  @arguments   = arguments
  @internal    = internal

  @connection.send_frame(AMQ::Protocol::Exchange::Declare.encode(@channel.id, @name, @type.to_s, passive, durable, auto_delete, internal, nowait, arguments))

  unless nowait
    self.define_callback(:declare, &block)
    @channel.exchanges_awaiting_declare_ok.push(self)
  end

  self
end

#exchange_delete(if_unused = false, nowait = false, &block) ⇒ Object



641
642
643
644
645
646
647
648
649
650
651
652
# File 'lib/amqp/exchange.rb', line 641

def exchange_delete(if_unused = false, nowait = false, &block)
  @connection.send_frame(AMQ::Protocol::Exchange::Delete.encode(@channel.id, @name, if_unused, nowait))

  unless nowait
    self.define_callback(:delete, &block)

    # TODO: delete itself from exchanges cache
    @channel.exchanges_awaiting_delete_ok.push(self)
  end

  self
end

#fanout?Boolean

Returns true if this exchange is of type `fanout`.

Returns:

  • (Boolean)

    true if this exchange is of type `fanout`



386
387
388
# File 'lib/amqp/exchange.rb', line 386

def fanout?
  @type == :fanout
end

#handle_bind_ok(method) ⇒ Object



857
858
859
# File 'lib/amqp/exchange.rb', line 857

def handle_bind_ok(method)
  self.exec_callback_once(:bind, method)
end

#handle_declare_ok(method) ⇒ Object



851
852
853
854
855
# File 'lib/amqp/exchange.rb', line 851

def handle_declare_ok(method)
  @channel.register_exchange(self)

  self.exec_callback_once_yielding_self(:declare, method)
end

#handle_delete_ok(method) ⇒ Object



865
866
867
# File 'lib/amqp/exchange.rb', line 865

def handle_delete_ok(method)
  self.exec_callback_once(:delete, method)
end

#handle_unbind_ok(method) ⇒ Object



861
862
863
# File 'lib/amqp/exchange.rb', line 861

def handle_unbind_ok(method)
  self.exec_callback_once(:unbind, method)
end

#headers?Boolean

Returns true if this exchange is of type `headers`.

Returns:

  • (Boolean)

    true if this exchange is of type `headers`



404
405
406
# File 'lib/amqp/exchange.rb', line 404

def headers?
  @type == :headers
end

#internal?Boolean

Returns true if this exchange is an internal exchange.

Returns:

  • (Boolean)

    true if this exchange is an internal exchange



420
421
422
# File 'lib/amqp/exchange.rb', line 420

def internal?
  @opts[:internal]
end

#on_connection_interruption(&block) ⇒ Object Also known as: after_connection_interruption

Defines a callback that will be executed after TCP connection is interrupted (typically because of a network failure). Only one callback can be defined (the one defined last replaces previously added ones).



792
793
794
# File 'lib/amqp/exchange.rb', line 792

def on_connection_interruption(&block)
  self.redefine_callback(:after_connection_interruption, &block)
end

#on_recovery(&block) ⇒ Object Also known as: after_recovery

Defines a callback that will be executed when AMQP connection is recovered after a network failure.. Only one callback can be defined (the one defined last replaces previously added ones).



823
824
825
# File 'lib/amqp/exchange.rb', line 823

def on_recovery(&block)
  self.redefine_callback(:after_recovery, &block)
end

#on_return(&block) ⇒ Object



775
776
777
778
779
# File 'lib/amqp/exchange.rb', line 775

def on_return(&block)
  self.redefine_callback(:return, &block)

  self
end

#predefined?Boolean

Returns true if this exchange is a pre-defined one (amq.direct, amq.fanout, amq.match and so on).

Returns:

  • (Boolean)

    true if this exchange is a pre-defined one (amq.direct, amq.fanout, amq.match and so on)



415
416
417
# File 'lib/amqp/exchange.rb', line 415

def predefined?
  @name && ((@name == AMQ::Protocol::EMPTY_STRING) || !!(@name =~ /^amq\.(direct|fanout|topic|headers|match)/i))
end

#publish(payload, options = {}, &block) ⇒ Exchange

Note:

Optional callback this method takes DOES NOT OFFER ANY GUARANTEES ABOUT DATA DELIVERY and must not be used as a “delivery callback”. The only way to guarantee delivery in distributed environment is to use an acknowledgement mechanism, such as AMQP transactions or lightweight “publisher confirms” RabbitMQ extension supported by amqp gem. See Durability and message persistence and Working With Exchanges guides for details.

Note:

Please make sure you read Durability an message persistence guide that covers exchanges durability vs. messages persistence.

Publishes message to the exchange. The message will be routed to queues by the exchange and distributed to any active consumers. Routing logic is determined by exchange type and configuration as well as message attributes (like :routing_key or message headers).

Published data is opaque and not modified by Ruby amqp gem in any way. Serialization of data with JSON, Thrift, BSON or similar libraries before publishing is very common.

Data serialization

Note that this method calls #to_s on payload argument value. Applications are encouraged of data serialization before publishing (using JSON, Thrift, Protocol Buffers or other serialization library). Note that because AMQP is a binary protocol, text formats like JSON largely lose their strong point of being easy to inspect as data travels across network, so BSON may be a good fit.

Publishing and message persistence

In cases when you application cannot afford to lose a message, AMQP 0.9.1 has several features to offer:

  • Persistent messages
  • Messages acknowledgements
  • Transactions
  • (a RabbitMQ-specific extension) Publisher confirms

This is a broad topic and we dedicate a separate guide, Durability and message persistence, to it.

Publishing callback and guarantees it DOES NOT offer

Exact moment when message is published is not determined and depends on many factors, including machine’s networking stack configuration, so (optional) block this method takes is scheduled for next event loop tick, and data is staged for delivery for current event loop tick. For most applications, this is good enough. The only way to guarantee a message was delivered in a distributed system is to ask a peer to send you a message back. RabbitMQ

Event loop blocking

When intermixing publishing of many messages with other workload that may take some time, even loop blocking may affect the performance. There are several ways to avoid it:

  • Run EventMachine in a separate thread.
  • Use EventMachine.next_tick.
  • Use EventMachine.defer to offload operation to EventMachine thread pool.

TBD: this subject is worth a separate guide

Sending one-off messages

If you need to send a one-off message and then stop the event loop, pass a block to #publish that will be executed after message is pushed down the network stack, and use Session#disconnect to properly tear down AMQP connection (see example under Examples section below).

Examples:

Publishing a one-off message and properly closing AMQP connection then stopping the event loop:

exchange.publish(data) do
  connection.disconnect { EventMachine.stop }
end

Publishing without routing key

exchange = channel.fanout('search.indexer')
# fanout exchanges deliver messages to bound queues unconditionally,
# so routing key is unnecessary here
exchange.publish("some data")

Publishing with a routing key

exchange = channel.direct('search.indexer')
exchange.publish("some data", :routing_key => "search.index.updates")

Parameters:

  • payload (#to_s)

    Message payload (content). Note that this method calls #to_s on payload argument value. You are encouraged to take care of data serialization before publishing (using JSON, Thrift, Protocol Buffers or other serialization library).

  • options (Hash) (defaults to: {})

    a customizable set of options

Options Hash (options):

  • :routing_key (String) — default: nil

    Specifies message routing key. Routing key determines what queues messages are delivered to (exact routing algorithms vary between exchange types).

  • :mandatory (Boolean) — default: false

    This flag tells the server how to react if the message cannot be routed to a queue. If message is mandatory, the server will return unroutable message back to the client with basic.return AMQPmethod. If message is not mandatory, the server silently drops the message.

  • :persistent (Boolean) — default: false

    When true, this message will be persisted to disk and remain in the queue until it is consumed. When false, the message is only kept in a transient store and will lost in case of server restart. When performance and latency are more important than durability, set :persistent => false. If durability is more important, set :persistent => true.

  • :content_type (String) — default: application/octet-stream

    Content-type of message payload.

Returns:



527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
# File 'lib/amqp/exchange.rb', line 527

def publish(payload, options = {}, &block)
  opts    = @default_publish_options.merge(options)

  @channel.once_open do
    properties                 = @default_headers.merge(options)
    properties[:delivery_mode] = properties.delete(:persistent) ? 2 : 1
    basic_publish(payload.to_s, opts[:key] || opts[:routing_key] || @default_routing_key, properties, opts[:mandatory])

    # don't pass block to AMQP::Exchange#publish because it will be executed
    # immediately and we want to do it later. See ruby-amqp/amqp/#67 MK.
    EventMachine.next_tick(&block) if block
  end

  self
end

#redeclare(&block) ⇒ Object



625
626
627
628
629
630
631
632
633
634
635
# File 'lib/amqp/exchange.rb', line 625

def redeclare(&block)
  nowait = block.nil?
  @connection.send_frame(AMQ::Protocol::Exchange::Declare.encode(@channel.id, @name, @type.to_s, @passive, @durable, @auto_delete, @internal, nowait, @arguments))

  unless nowait
    self.define_callback(:declare, &block)
    @channel.exchanges_awaiting_declare_ok.push(self)
  end

  self
end

#resetObject

Resets queue state. Useful for error handling.



597
598
599
# File 'lib/amqp/exchange.rb', line 597

def reset
  initialize(@channel, @type, @name, @opts)
end

#topic?Boolean

Returns true if this exchange is of type `topic`.

Returns:

  • (Boolean)

    true if this exchange is of type `topic`



398
399
400
# File 'lib/amqp/exchange.rb', line 398

def topic?
  @type == :topic
end

#transient?Boolean Also known as: temporary?

Note:

Please make sure you read Exchange class documentation section on exchanges durability vs. messages persistence.

Returns true if this exchange is transient (non-durable).

Returns:

  • (Boolean)

    true if this exchange is transient (non-durable)



583
584
585
# File 'lib/amqp/exchange.rb', line 583

def transient?
  !self.durable?
end

#unbind(source, opts = {}, &block) ⇒ Object



744
745
746
747
748
749
750
751
752
753
754
755
756
757
# File 'lib/amqp/exchange.rb', line 744

def unbind(source, opts = {}, &block)
  source = source.name if source.respond_to?(:name)
  routing_key = opts[:key] || opts[:routing_key] || AMQ::Protocol::EMPTY_STRING
  arguments = opts[:arguments] || {}
  nowait = opts[:nowait] || block.nil?
  @channel.once_open do
    @connection.send_frame(AMQ::Protocol::Exchange::Unbind.encode(@channel.id, @name, source, routing_key, nowait, arguments))
    unless nowait
      self.define_callback(:unbind, &block)
      @channel.exchanges_awaiting_unbind_ok.push(self)
    end
  end
  self
end