Connecting to the broker, integrating with Ruby on Rails, Merb and Sinatra
This Documentation Has Moved to rubyamqp.info
amqp gem documentation guides are now hosted on rubyamqp.info.
About this guide
This guide covers connection to an AMQP broker from standalone and Web applications, connection error handling, authentication failure handling and related issues.
This work is licensed under a Creative Commons Attribution 3.0 Unported License (including images & stylesheets). The source is available on Github.
Which versions of the amqp gem does this guide cover?
This guide covers v0.8.0 and later of the Ruby amqp gem.
Terminology
In this guide we define a standalone application as an application that does not run on a Web server like Unicorn or Passenger. The key difference is that these applications control the main Ruby VM thread and often use it to run the EventMachine event loop. When the amqp gem is used in a Web application, the main thread is occupied by the Web application server and the code required to establish a connection to an AMQP broker needs to be a little bit different.
Two ways to specify connection parameters
Connection parameters (host, port, username, vhost and so on) can be passed in two forms:
- As a hash
- As a connection URI string (à la JDBC)
Using a hash
Hash options that the amqp gem will recognize are
- :host
- :port
- :username (aliased as :user)
- :password (aliased as :pass)
- :vhost
- :ssl
- :timeout
- :frame_max
Default parameters
Default connection parameters are
{
:host => "127.0.0.1",
:port => 5672,
:user => "guest",
:pass => "guest",
:vhost => "/",
:ssl => false,
:frame_max => 131072
}
Using connection strings
It is convenient to be able to specify the AMQP connection parameters as a URI string, and various “amqp” URI schemes exist. Unfortunately, there is no standard for these URIs, so while the schemes share the same basic idea, they differ in some details. This implementation aims to encourage URIs that work as widely as possible.
Here are some examples:
- amqp://dev.rabbitmq.com
- amqp://dev.rabbitmq.com:5672
- amqp://guest:[email protected]:5672
- amqp://hedgehog:[email protected]/production
- amqps://hub.megacorp.internal/%2Fvault
The URI scheme should be “amqp”, or “amqps” if SSL is required.
The host, port, username and password are represented in the authority component of the URI in the same way as in http URIs.
The vhost is obtained from the first segment of the path, with the leading slash removed. The path should contain only a single segment (i.e, the only slash in it should be the leading one). If the vhost is to include slashes or other reserved URI characters, these should be percent-escaped.
Here are some examples that demonstrate how AMQP::Client.parse_connection_uri parses out the vhost from connection URIs:
AMQP::Client.parse_connection_uri("amqp://dev.rabbitmq.com") # => vhost is nil, so default ("/") will be used
AMQP::Client.parse_connection_uri("amqp://dev.rabbitmq.com/") # => vhost is an empty string
AMQP::Client.parse_connection_uri("amqp://dev.rabbitmq.com/%2Fvault") # => vhost is "/vault"
AMQP::Client.parse_connection_uri("amqp://dev.rabbitmq.com/production") # => vhost is "production"
AMQP::Client.parse_connection_uri("amqp://dev.rabbitmq.com/a.b.c") # => vhost is "a.b.c"
AMQP::Client.parse_connection_uri("amqp://dev.rabbitmq.com/foo/bar") # => ArgumentError
Starting the event loop and connecting in standalone applications
EventMachine event loop
The amqp gem uses EventMachine under the hood and needs an EventMachine event loop to be running in order to connect to an AMQP broker or to send any data. This means that before connecting to an AMQP broker, we need to start the EventMachine reactor (get the event loop going). Here is how to do it:
require "amqp"
EventMachine.run do
# ...
end
EventMachine.run will block the current thread until the event loop is stopped. Standalone applications often can afford to start the event loop on the main thread. If you have no experience with threading, this is a recommended way to proceed.
Using AMQP.connect with a block
Once the event loop is running, the AMQP.connect method will attempt to connect to the broker. It can be used in two ways. Here is the first one:
require "amqp"
EventMachine.run do
# using AMQP.connect with a block
AMQP.connect(:host => "localhost") do |client|
# connection is open and ready to be used
end
end
AMQP.connect takes a block that will be executed as soon as the AMQP connection is open. In order for a connection to be opened a TCP connection has to be set up, authentication has to succeed, and the broker and client need to complete negotiation of connection parameters like max frame size.
Using AMQP.connect without a callback
An alternative way of connecting is this:
require "amqp"
EventMachine.run do
# using AMQP.connect with a block
client = AMQP.connect(:host => "hub.megacorp.internal", :username => "hedgehog", :password => "t0ps3kr3t")
# connection is not yet open, however, amqp gem will delay channel
# operations until after the connection is open. Bear in mind that
# amqp gem cannot solve every possible race condition so be careful
end
If you do not need to assign the returned value to a variable, then the “block version” is recommended because it eliminates issues that may arise from attempts to use a connection object that is not fully opened yet. For example, handling of authentication failures is simpler with the block version, as we will see in the following sections.
Using AMQP.start
EventMachine.run and AMQP.connect with a block is such a common combination that the amqp gem provides a shortcut:
require "amqp"
AMQP.start("amqp://dev.rabbitmq.com:5672") do |client|
# connection is open and ready to be used
end
As these examples demonstrate, AMQP.connect and AMQP.start accept either a Hash of connection options or a connection URI string. See the reference documentation for each method to learn all of the options that they accept and what the default values are.
On Thread#sleep use
When not passing a block to AMQP.connect, it is tempting to “give the connection some time to become established” by using Thread#sleep. Unless you are running the event loop in a separate thread, please do not do this. Thread#sleep blocks the current thread so that if the event loop is running in the current thread, blocking the thread will also block the event loop. When the event loop is blocked, no data is sent or received, so the connection does not proceed.
Detecting TCP connection failures
When applications connect to the broker, they need to handle connection failures. Networks are not 100% reliable and even with modern system configuration tools, like Chef or Puppet, misconfigurations can happen. Also, the broker might be down for some reason. Ideally, error detection should happen as early as possible. There are two ways of detecting TCP connection failure, the first one is to catch an exception:
#!/usr/bin/env ruby
# encoding: utf-8
require "rubygems"
require "amqp"
puts "=> TCP connection failure handling with a rescue statement"
puts
connection_settings = {
:port => 9689,
:vhost => "/amq_client_testbed",
:user => "amq_client_gem",
:password => "amq_client_gem_password",
:timeout => 0.3
}
begin
AMQP.start(connection_settings) do |connection, open_ok|
raise "This should not be reachable"
end
rescue AMQP::TCPConnectionFailed => e
puts "Caught AMQP::TCPConnectionFailed => TCP connection failed, as expected."
end
AMQP.connect (and AMQP.start) will raise AMQP::TCPConnectionFailed if the connection fails. Code that catches the error can write to a log about the issue or use retry to execute the begin block one more time. Because initial connection failures are due to misconfiguration or network outage, reconnection to the same endpoint (hostname, port, vhost combination) will result in the same error over and over again. TBD: failover, connection to the cluster.
An alternative way of handling connection failure is with an errback (a callback for a specific kind of error):
#!/usr/bin/env ruby
# encoding: utf-8
require "rubygems"
require "amqp"
puts "=> TCP connection failure handling with a callback"
puts
handler = Proc.new { |settings| puts "Failed to connect, as expected"; EM.stop }
connection_settings = {
:port => 9689,
:vhost => "/amq_client_testbed",
:user => "amq_client_gem",
:password => "amq_client_gem_password",
:timeout => 0.3,
:on_tcp_connection_failure => handler
}
AMQP.start(connection_settings) do |connection, open_ok|
raise "This should not be reachable"
end
the “:on_tcp_connection_failure” option accepts any object that responds to #call.
If you connect to the broker from code in a class (as opposed to top-level scope in a script), Object#method can be used to pass an object method as a handler instead of a Proc.
TBD: provide an example
Detecting authentication failures
A connection may also fail due to authentication failure. Handling authentication failure is very similar to handling an initial TCP connection failure:
#!/usr/bin/env ruby
# encoding: utf-8
require "rubygems"
require "amqp"
puts "=> Authentication failure handling with a callback"
puts
handler = Proc.new { |settings| puts "Failed to connect, as expected"; EM.stop }
connection_settings = {
:port => 5672,
:vhost => "/amq_client_testbed",
:user => "amq_client_gem",
:password => "amq_client_gem_password_that_is_incorrect #{Time.now.to_i}",
:timeout => 0.3,
:on_tcp_connection_failure => handler,
:on_possible_authentication_failure => Proc.new { |settings|
puts "Authentication failed, as expected, settings are: #{settings.inspect}"
EM.stop
}
}
AMQP.start(connection_settings) do |connection, open_ok|
raise "This should not be reachable"
end
default handler raises AMQP::PossibleAuthenticationFailureError:
#!/usr/bin/env ruby
# encoding: utf-8
require "rubygems"
require "amqp"
puts "=> Authentication failure handling with a rescue block"
puts
handler = Proc.new { |settings| puts "Failed to connect, as expected"; EM.stop }
connection_settings = {
:port => 5672,
:vhost => "/amq_client_testbed",
:user => "amq_client_gem",
:password => "amq_client_gem_password_that_is_incorrect #{Time.now.to_i}",
:timeout => 0.3,
:on_tcp_connection_failure => handler
}
begin
AMQP.start(connection_settings) do |connection, open_ok|
raise "This should not be reachable"
end
rescue AMQP::PossibleAuthenticationFailureError => afe
puts "Authentication failed, as expected, caught #{afe.inspect}"
EventMachine.stop if EventMachine.reactor_running?
end
In case you are wondering why the callback name has “possible” in it, AMQP 0.9.1 spec requires broker implementations to simply close the TCP connection without sending any more data when an exception, such as authentication failure, occurs before the AMQP connection is open. In practice, however, when a broker closes a TCP connection after a successful TCP connection has been established but before an AMQP connection is open, it means that authentication has failed.
Starting the event loop and connecting in Web applications (Ruby on Rails, Sinatra, Merb, Rack)
Web applications are different from standalone applications in that the main thread is occupied by a Web/application server like Unicorn or Thin, so you need to start the EventMachine reactor before you attempt to use AMQP.connect. In a Ruby on Rails application, probably the best place for this is in the initializer (like config/initializers/amqp.rb). For Merb applications it is config/init.rb. For Sinatra and pure Rack applications, place it next to the other configuration code.
Next, we are going to discuss issues specific to particular Web servers.
Using Ruby amqp gem with Unicorn
Unicorn is a pre-forking server
Unicorn is a pre-forking server. That means it forks worker processes that serve HTTP requests. The fork system call has several gotchas associated with it, two of which affect EventMachine and the Ruby amqp gem:
- Unintentional file descriptor sharing
- The fact that a forked child process only inherits one thread and therefore the EventMachine thread is not inherited
To avoid both problems, start the EventMachine reactor and AMQP connection after the master process forks workers. The master Unicorn process never serves HTTP requests and usually does not need to hold an AMQP connection. Next, let us see how to spin up the EventMachine reactor and connect to the broker after Unicorn forks a worker.
Starting the EventMachine reactor and connecting to the broker after Unicorn forks worker processes
Unicorn lets you specify a configuration file to use. In that file you define a callback that Unicorn runs after it forks worker process(es):
ENV["FORKING"] = "true"
listen 3000
worker_processes 1
timeout 30
preload_app true
after_fork do |server, worker|
require "amqp"
# the following is *required* for Rails + "preload_app true",
defined?(ActiveRecord::Base) and
ActiveRecord::Base.establish_connection
t = Thread.new { AMQP.start }
sleep(1.0)
EventMachine.next_tick do
AMQP.channel ||= AMQP::Channel.new(AMQP.connection)
AMQP.channel.queue("amqpgem.examples.rails23.warmup", :durable => true)
3.times do |i|
puts "[after_fork/amqp] Publishing a warmup message ##{i}"
AMQP.channel.default_exchange.publish("A warmup message #{i} from #{Time.now.strftime('%H:%M:%S %m/%b/%Y')}", :routing_key => "amqpgem.examples.rails23.warmup")
end
end
end
In the example above we start the EventMachine reactor in a separate thread, block the current thread for 1 second to let the event loop spin up and then connect to the AMQP broker on the next event loop tick. Publishing several warmup messages on boot is a good idea because it allows the early detection of issues that forking may cause.
Note that a configuration file can easily be used in development environments because, other than the fact that Unicorn runs in the foreground, it gives you exactly the same application boot behavior as in QA and production environments.
An example Ruby on Rails application that uses the Ruby amqp gem and Unicorn is available on GitHub.
Using the Ruby amqp gem with Passenger
Phusion Passenger is also a pre-forking server, and just as with Unicorn, the EventMachine reactor and AMQP connection should be started after it forks worker processes. The Passenger documentation has a section that explains how to avoid problems related to the behavior of the fork(2) system call, namely:
- Unintentional file descriptor sharing
- The fact that a forked child process only inherits one thread and therefore the EventMachine thread is not inherited
Using an event handler to spawn one amqp connection per worker
Passenger provides a hook that you should use for spawning AMQP connections:
if defined?(PhusionPassenger) # otherwise it breaks rake commands if you put this in an initializer
PhusionPassenger.on_event(:starting_worker_process) do |forked|
if forked
# We're in a smart spawning mode
# Now is a good place to connect to the broker
end
end
end
Basically, the recommended default smart spawn mode works exactly the same as in Unicorn (with all of the same common pitfalls). An example application is available on github.
Using the Ruby amqp gem with Thin and Goliath
Thin and Goliath start the EventMachine reactor for you, but there is a little nuance
If you use Thin or Goliath, you are all set because those two servers use EventMachine under the hood. There is no need to start the EventMachine reactor. However, depending on the application server, its version, the version of the framework and Rack middleware being used, EventMachine reactor start may be slightly delayed. To overcome this potential difficulty, use EventMachine.next_tick to delay connection until after the reactor is actually running:
EventMachine.next_tick { AMQP.connect(...) }
So, in case the EventMachine reactor is not yet running on server/application boot, the connection will not fail but will instead wait for the reactor to start. Thin and Goliath are not pre-forking servers so there is no need to re-establish the connection as you do with Unicorn and Passenger.
If it just does not work: troubleshooting
If you have read this guide and your issue is still unresolved, check our Troubleshooting guide before asking on the mailing list.
What to read next
- Working With Queues. This guide focuses on features consumer applications use heavily.
- Working With Exchanges. This guide focuses on features producer applications use heavily.
- Error handling
- Using TLS (SSL) (if you want to use an SSL encrypted connection to the broker)
Authors
This guide was written by Michael Klishin and edited by Chris Duncan.
Tell us what you think!
Please take a moment to tell us what you think about this guide on Twitter or the Ruby AMQP mailing list. Let us know what was unclear or what has not been covered. Maybe you do not like the guide style or grammar or discover spelling mistakes. Reader feedback is key to making the documentation better.
If, for some reason, you cannot use the communication channels mentioned above, you can contact the author of the guides directly