“master” branch: <img src=“https://travis-ci.org/rightscale/right_amqp.svg?branch=master” alt=“Build Status” />

RightAMQP

DESCRIPTION

Synopsis

RightAMQP provides a high availability client for interfacing with the RightScale RabbitMQ broker using the AMQP protocol. The AMQP version on which this gem is based is 0.6.7 but beyond that it contains a number of bug fixes and enhancements including reconnect, message return, heartbeat, and UTF-8 support. The high availability is achieved by maintaining multiple broker connections such that failed connections automatically reconnect and only connected brokers are used when routing a message. Although the HABrokerClient class is the intended primary means for accessing RabbitMQ services with this gem, alternatively the underlying AMQP services may be used directly.

Refer to the wiki (github.com/rightscale/right_amqp/wiki) for up-to-date documentation.

Also use the built-in issues tracker (github.com/rightscale/right_amqp/issues) to report issues.

Maintained by the RightScale Cornsilk Team

Interface

The focus here is on the interface provided by the RightAMQP::HABrokerClient class for connecting to one or more brokers in high availability fashion, subscribing to queues or exchanges on each of the brokers, publishing messages using any of the brokers, and monitoring the status of the broker connections.

The namespace for high availability access is RightAMQP. The namespace for low level AMQP support remains AMQP. Both rely on eventmachine for task management.

Connecting

Creating an HABrokerClient object causes connections to one or more brokers to be created, e.g.,

b = RightAMQP::HABrokerClient.new(serializer, :user => 'user', :pass => 'secret',
                                  :vhost => '/abc', :host => 'broker0.com,broker1.com')

would result in a connection to the brokers with domain names broker0.com and broker1.com using the specified :user and :pass as RabbitMQ credentials and :vhost as the namespace in which to operate. See the detailed code documentation for other configuration options.

To know when a connection has been established, #connection_status is used, e.g.,

b.connection_status(:one_off => 60) do |status|
  if status == :connected
    # Perform other application setup including subscribing to queues
  elsif status == :failed
    puts "Could not connect to any brokers"
    EM.stop
  end
end

with the :on_off option indicating that only seeking to be notified of the first connection status change and in this case only willing to wait 60 seconds.

Reconnecting, Heartbeat, and Status Updates

If a broker connection is lost, the HABrokerClient will automatically attempt to reconnect on the interval specified with the :reconnect_interval option. As further protection against lost connections the :heartbeat option may be specified as the interval between AMQP keep alive messages being sent between the application and the brokers. This is useful in firewall environments.

The #connection_status method used above to detect the initial connection may also be used to monitor connectivity throughout the life of the application, e.g.,

b.connection_status do |status|
  puts "Status changed to #{status}"
end

For this request the status is only reported for all the brokers as a whole, i.e., a :disconnected status means that have lost connectivity to all brokers, and then a subsequent :connected status means that have regained connection to at least one broker. There are finer grain controls by indicating the specific :brokers of interest

Subscribing, Declaring, and Unsubscribing

To receive messages via this interface, one must subscribe to one or more AMQP queues or exchanges. This is done with the #subscribe method, e.g.,

queue = {:name => 'my_queue', :options => {:durable => true}}
broker_ids = @broker.subscribe(queue, exchange = nil, :ack => true) do |id, msg|
  puts "Received packet #{msg.inspect} via broker #{id}"
end

causes the queue named “my_queue” to be created on all currently connected brokers in the AMQP durable fashion, meaning that it is preserved across restarts of the broker. The #subscribe call returns the ids of the brokers to which the subscribe request was submitted. The :ack option indicates to explicitly acknowledge each message as soon as it is received, rather than use implicit ack handling. The provided block is executed each time a message is received on the queue.

If serialization is configured, the message delivered here is after applying the serializers #load method; otherwise the message is exactly the bytes that were placed in the queue.

To unsubscribe from a queue the #unsubscribe method is used.

To simply create a queue or exchange without subscribing to it to receive messages, use #declare.

Publishing

To publish a message to a queue or exchange, the #publish method is used, e.g.,

queue = {:type => :queue, :name => "request", :options => {:no_declare => true}}
broker_ids = b.publish(queue, message, :persistent => true, :mandatory => true)

causes the specified message to be published to the queue named “request”. The :no_declare option is specified to keep AMQP from attempting to create the exchange before publishing to it. On the publish the :persistent option indicates that all attempts are to be made to preserve the message if the broker is stopped or crasheds, but this is not a guarantee. The :mandatory option indicates that the broker is to return the message if the specified queue does not exist or is not being consumed, i.e., subscribed to by another application.

Serialization

If a serializer is supplied when the HABrokerClient is constructed, its #load method is applied to all messages received from a broker, and its #dump method is applied to all messages that are published. Even if a serializer is specified it is possible to specify :no_unserialize for a particular subscription or :no_serialize for a message being published.

If no serializer is supplied, individual messages published or received are not logged. Further a serialized message is probed for the existence of certain properties, like :token, :type, and :from, and these are tracked so that if a message is returned as undeliverable, this information can be provided on the #non_delivery callback.

Message Return and Non-Delivery Callback

If a broker returns a message because it cannot be delivered to the intended recipient, e.g., because the option :mandatory was set and there are no consumers or the broker is in the process of shutting down, the HABrokerClient attempts to deliver the message using another broker in the configured set.

When it runs out of connected brokers to attempt the delivery, it declares the message and undeliverable. In this case it also executes the block, if any, supplied by the application via the #non_delivery method. The data supplied includes the reason, type, token, from, and to, with the type, token, and from values being nil unless they could be extracted from the message being published.

The possible reasons for non-delivery are:

"NO_ROUTE" - queue does not exist
"NO_CONSUMERS" - queue exists but it has no consumers
"ACCESS_REFUSED" - queue not usable because broker is in the process of stopping service

Closing

When all connections to the brokers are to be closed, the #close method is used

Identities

A broker is given an identity of the form “rs-broker-<hostname>-<port>” where <hostname> is its host name, e.g., broker1.com, and <port> is the TCP port number used (5672 by default). This is used in the interface when a specific broker needs to be referred to, e.g., when communicating status.

Error Callbacks

When constructing the HABrokerClient the :exception_callback option can be specified to define the Proc to be activated on exception events. In addition an :exception_on_receive_callback Proc can be specified for activation when a message cannot be received.

Logging

To enable logging in the HABrokerClient set RightSupport::Log::Mixin.default_logger to the logger in use that supports the standard ruby Logger interface. Logging can be disabled on individual #subscribe and #publish requests with :no_log. By default each message received or published is logged unless no serializer is supplied.

Detailed AMQP logging can be enabled by setting AMQP.logging = true.

Status and Stats

The current status of all brokers can be obtained with the #status method. The operation statistics are obtained with the #stats method. This method has an option for resetting the statistics.

ADDITIONAL RESOURCES

LICENSE

RightAMQP

Copyright

Copyright © 2012 RightScale, Inc.

Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the ‘Software’), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so, subject to the following conditions:

The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software.

THE SOFTWARE IS PROVIDED ‘AS IS’, WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.