Vendor-specific AMQP extensions support in amqp gem
This Documentation Has Moved to rubyamqp.info
amqp gem documentation guides are now hosted on rubyamqp.info.
RabbitMQ extensions
Supported extensions
AMQP gem supports many RabbitMQ extensions to AMQP 0.9.1:
- Publisher confirmations (confirm.* class)
- Negative acknowledgements (basic.nack)
- Alternate Exchanges
- Per-queue Message Time-to-Live
- Queue Leases
- Sender-selected Distribution
- Validated user_id
Enabling RabbitMQ extensions
If you are using RabbitMQ as AMQP broker and want to use these extensions, simply replace
require "amqp"
with
require "amqp"
require "amqp/extensions/rabbitmq"
Per-queue Message Time-to-Live
Per-queue Message Time-to-Live (TTL) is a RabbitMQ extension to AMQP 0.9.1 that lets developers control for how long a message published to a queue can live before it is discarded. A message that has been in the queue for longer than the configured TTL is said to be dead. Dead messages will not be delivered to consumers and cannot be fetched using basic.get operation (AMQP::Queue#pop).
Message TTL is specified using x-message-ttl argument on declaration. With amqp gem, you pass it to AMQP::Queue#initialize or AMQP::Channel#queue:
# 1000 milliseconds
channel.queue("", :arguments => { "x-message-ttl" => 1000 })
When a published messages is routed to multiple queues, each of the queues gets a copy of the message. If then the message dies in one of the queues, it has no effect on copies of the message in other queues.
Example
The example below sets message TTL for a new server-named queue to be 1000 milliseconds. It then publishes several messages that are routed to the queue and tries to fetch messages using basic.get AMQP method (AMQP::Queue#pop after 0.7 and 1.5 seconds:
Learn More
See also rabbitmq.com section on Per-queue Message TTL
Publisher Confirms (Publisher Acknowledgements)
In some situations not a single message can be lost. The only reliable way of doing so is using confirmations. Publisher Confirms AMQP extension was designed to solve the reliable publishing problem.
Publisher confirms are similar to message acknowledgements documented in the Working With Queues guide but involve publisher and AMQP broker instead of consumer and AMQP broker.
Public API
To use publisher confirmations, first put channel into confirmation mode using AMQP::Channel#confirm_select:
channel.confirm_select
From this moment on, every message published on this channel will cause channel’s publisher index (message counter) to be incremented. It is possible to access using AMQP::Channel#publisher_index method. To check whether channel is in the confirmation mode, use AMQP::Channel#uses_publisher_confirmations? predicate.
To handle AMQP broker acknowledgements, define a handler using AMQP::Channel#on_ack, for example:
channel.on_ack do |basic_ack|
puts "Received basic_ack: multiple = #{basic_ack.multiple}, delivery_tag = #{basic_ack.delivery_tag}"
end
Delivery tag will indicate number of confirmed messages. If multiple attribute is true, the confirmation is for all messages up to the number delivery tag indicates. In other words, AMQP broker may confirm just one message or a batch of them.
Example
Learn More
See also rabbitmq.com section on Confirms aka Publisher Acknowledgements
basic.nack
The AMQP specification defines the basic.reject method that allows clients to reject individual, delivered messages, instructing the broker to either discard them or requeue them. Unfortunately, basic.reject provides no support for negatively acknowledging messages in bulk.
To solve this, RabbitMQ supports the basic.nack method that provides all the functionality of basic.reject whilst also allowing for bulk processing of messages.
Public API
When RabbitMQ extensions are loaded, AMQP::Channel#reject method is overriden via mixin to take one additional argument: multi (defaults to false). When it is given and is true, amqp gem will use basic.nack AMQP method instead of basic.reject, to reject multiple messages at once. Otherwise, basic.reject is used as usual.
Learn More
See also rabbitmq.com section on Confirms aka Publisher Acknowledgements
Alternate Exchanges
Alternate Exchanges is a RabbitMQ extension to AMQP 0.9.1 that lets developers define “fallback” exchanges where unroutable messages will be sent.
Public API
To specify exchange A as alternate exchange to exchange B, specify ‘alternate-exchange’ argument on declaration of B:
exchange1 = channel.fanout("ops.fallback", :auto_delete => true)
exchange2 = channel.fanout("events.collector", :auto_delete => true, :arguments => { "alternate-exchange" => "ops.fallback" })
Example
Learn More
See also rabbitmq.com section on Alternate Exchanges
Authors
This guide was written by Michael Klishin and edited by Chris Duncan.
Tell us what you think!
Please take a moment and tell us what you think about this guide on Twitter or Ruby AMQP mailing list: what was unclear? what wasn’t covered? maybe you don’t like guide style or grammar and spelling are incorrect? Readers feedback is key to making documentation better.
If mailing list communication is not an option for you for some reason, you can contact guides author directly