Vendor-specific AMQP extensions support in amqp gem

This Documentation Has Moved to rubyamqp.info

amqp gem documentation guides are now hosted on rubyamqp.info.

RabbitMQ extensions

Supported extensions

AMQP gem supports many RabbitMQ extensions to AMQP 0.9.1:

Enabling RabbitMQ extensions

If you are using RabbitMQ as AMQP broker and want to use these extensions, simply replace

require "amqp"

with


require "amqp"
require "amqp/extensions/rabbitmq"

Per-queue Message Time-to-Live

Per-queue Message Time-to-Live (TTL) is a RabbitMQ extension to AMQP 0.9.1 that lets developers control for how long a message published to a queue can live before it is discarded. A message that has been in the queue for longer than the configured TTL is said to be dead. Dead messages will not be delivered to consumers and cannot be fetched using basic.get operation (AMQP::Queue#pop).

Message TTL is specified using x-message-ttl argument on declaration. With amqp gem, you pass it to AMQP::Queue#initialize or AMQP::Channel#queue:


# 1000 milliseconds
channel.queue("", :arguments => { "x-message-ttl" => 1000 })

When a published messages is routed to multiple queues, each of the queues gets a copy of the message. If then the message dies in one of the queues, it has no effect on copies of the message in other queues.

Example

The example below sets message TTL for a new server-named queue to be 1000 milliseconds. It then publishes several messages that are routed to the queue and tries to fetch messages using basic.get AMQP method (AMQP::Queue#pop after 0.7 and 1.5 seconds:

Learn More

See also rabbitmq.com section on Per-queue Message TTL

Publisher Confirms (Publisher Acknowledgements)

In some situations not a single message can be lost. The only reliable way of doing so is using confirmations. Publisher Confirms AMQP extension was designed to solve the reliable publishing problem.

Publisher confirms are similar to message acknowledgements documented in the Working With Queues guide but involve publisher and AMQP broker instead of consumer and AMQP broker.

Public API

To use publisher confirmations, first put channel into confirmation mode using AMQP::Channel#confirm_select:


channel.confirm_select

From this moment on, every message published on this channel will cause channel’s publisher index (message counter) to be incremented. It is possible to access using AMQP::Channel#publisher_index method. To check whether channel is in the confirmation mode, use AMQP::Channel#uses_publisher_confirmations? predicate.

To handle AMQP broker acknowledgements, define a handler using AMQP::Channel#on_ack, for example:


channel.on_ack do |basic_ack|
 puts "Received basic_ack: multiple = #{basic_ack.multiple}, delivery_tag = #{basic_ack.delivery_tag}"
end

Delivery tag will indicate number of confirmed messages. If multiple attribute is true, the confirmation is for all messages up to the number delivery tag indicates. In other words, AMQP broker may confirm just one message or a batch of them.

Example

Learn More

See also rabbitmq.com section on Confirms aka Publisher Acknowledgements

basic.nack

The AMQP specification defines the basic.reject method that allows clients to reject individual, delivered messages, instructing the broker to either discard them or requeue them. Unfortunately, basic.reject provides no support for negatively acknowledging messages in bulk.

To solve this, RabbitMQ supports the basic.nack method that provides all the functionality of basic.reject whilst also allowing for bulk processing of messages.

Public API

When RabbitMQ extensions are loaded, AMQP::Channel#reject method is overriden via mixin to take one additional argument: multi (defaults to false). When it is given and is true, amqp gem will use basic.nack AMQP method instead of basic.reject, to reject multiple messages at once. Otherwise, basic.reject is used as usual.

Learn More

See also rabbitmq.com section on Confirms aka Publisher Acknowledgements

Alternate Exchanges

Alternate Exchanges is a RabbitMQ extension to AMQP 0.9.1 that lets developers define “fallback” exchanges where unroutable messages will be sent.

Public API

To specify exchange A as alternate exchange to exchange B, specify ‘alternate-exchange’ argument on declaration of B:


exchange1 = channel.fanout("ops.fallback",     :auto_delete => true)
exchange2 = channel.fanout("events.collector", :auto_delete => true, :arguments => { "alternate-exchange" => "ops.fallback" })

Example

Learn More

See also rabbitmq.com section on Alternate Exchanges

Authors

This guide was written by Michael Klishin and edited by Chris Duncan.

Tell us what you think!

Please take a moment and tell us what you think about this guide on Twitter or Ruby AMQP mailing list: what was unclear? what wasn’t covered? maybe you don’t like guide style or grammar and spelling are incorrect? Readers feedback is key to making documentation better.

If mailing list communication is not an option for you for some reason, you can contact guides author directly