Ruby job queue backed by RabbitMQ. The word cuniculus comes from the scientific name of the European rabbit (Oryctolagus cuniculus).
Getting started
gem install cuniculus
The following minimal example assumes RabbitMQ is running on localhost:5672
; see the configuration section for how to change this.
Create a worker class:
# -- my_worker.rb
require 'cuniculus/worker'
class MyWorker
extend Cuniculus::Worker
# the queue name is not explicitly given, so "cun_default" is used.
def perform(arg1, arg2)
puts "Processing:"
puts "arg1: #{arg1.inspect}"
puts "arg2: #{arg2.inspect}"
end
end
Add jobs to queue:
MyWorker.perform_async('x', [1, 2, 3])
Start the job consumer:
cuniculus -r my_worker.rb
Benchmarks
The following measurements were performed with the bin/run_benchmarks
utility, with different command parameters. Run it with -h
to see its usage.
To simulate network latency, Toxiproxy was used. It needs to be started with toxiproxy-server
before running the benchmarks.
Network latency (ms) | Prefetch count | Throughput (jobs/s) | Average latency (ms) |
---|---|---|---|
1 | 65535 (max. allowed) | 10225 | 2 |
10 | 65535 (max. allowed) | 9990 | 13 |
1 | 50 | 8051 | 2 |
10 | 50 | 2500 | 13 |
100 | 50 | 481 | 103 |
1 | 10 (default) | 5266 | 2 |
10 | 10 (default) | 807 | 13 |
1 | 1 | 481 | 2 |
10 | 1 | 81 | 13 |
Additional benchmark parameters:
- throughput was measured by consuming 100k jobs;
- job latency was averaged over 200 samples;
- Ruby 2.7.2 was used.
Several remarks can be made:
- Higher prefetch counts lead to higher throughput, but there are downsides of having it too high; see this reference on how to properly tune it.
- Network latency has a severe impact on the throughput, and the effect is larger the smaller the prefetch count is.
Configuration
Configuration is done through code, using Cuniculus.configure
.
Example:
require "cuniculus"
# The following Hash is passed as is to Bunny, the library that integrates with RabbitMQ.
rabbitmq_conn = {
host: 'rabbitmq', # default is 127.0.0.1
port: 5672,
ssl: false,
vhost: '/',
user: 'guest',
pass: 'guest',
auth_mechanism: 'PLAIN',
}
Cuniculus.configure do |cfg|
cfg.rabbitmq_opts = rabbitmq_conn
cfg.pub_pool_size = 5 # Only affects job producers
cfg.dead_queue_ttl = 1000 * 60 * 60 * 24 * 30 # keep failed jobs for 30 days
cfg.add_queue({ name: "critical", durable: true, max_retry: 10, prefetch_count: 1})
end
To configure the queue used by a worker, use cuniculus_options
:
class MyWorker
include Cuniculus::Worker
queue: "critical"
def perform
# code
end
end
More examples
There is also a more complete example in the Cuniculus repository itself. To run it, clone the repository, then
- start the Ruby and RabbitMQ containers using Docker Compose:
docker-compose up -d
- from within the cuniculus container, produce a job:
ruby -Ilib examples/produce.rb
- also from within the container, start the consumer:
bin/cuniculus -I examples/ -r example/init_cuniculus.rb
The -I examples
option adds the examples/
directory into the load path, and -r example/init_cuniculus.rb
requires init_cuniculus.rb
prior to starting the consumer. The latter is where configurations such as that described in the configuration section section should be.
Error handling
By default, exceptions raised when consuming a job are logged to STDOUT. This can be overriden with the Cuniculus.error_handler
method:
Cuniculus.error_handler do |e|
puts "Oh nein! #{e}"
LoggingService.send(e)
end
The method expects a block that will receive an exception, and run in the scope of the Worker instance.
Publisher proper shutdown
When perform_async
is called, the job is first put into a local (in-memory) queue that is published to RabbitMQ by a worker in a worker pool (the size of which is configured with config.pub_pool_size
).
To ensure Cuniculus tries to finish publishing jobs on shutdown, it's important that Cuniculus.shutdown
is called. Once this method is called, workers have a grace period to publish enqueued jobs, after which the shutdown is forced. The period is set in seconds in config.pub_shutdown_grace_period
(defaults to 50).
Example code for the Puma web server:
on_worker_shutdown do
Cuniculus.shutdown
end
Retry mechanism
Retries are enabled by default (with 8 retries) with an exponential backoff, meaning the time between retries increases the more failures happen. The formula for calculating the times between retries can be found in Cuniculus::QueueConfig, namely in the x-message-ttl
line. As an example, the time between the 7th and 8th retries is roughly 29 days.
Given a queue in the configuration, Cuniculus declares on RabbitMQ the corresponding base queue, in addition to its retry queues. As an example, let's consider the default queue cun_default
: Cuniculus declares a cun_default
queue, together with some cun_default_{n}
queues used for job retries.
When a job raises an exception, it is placed into the cun_default_1
queue for the first retry. It stays there for some pre-defined time, and then gets moved back into the cun_default
queue for execution.
If it fails again, it gets moved to cun_default_2
, where it stays for a longer period until it's moved back directly into the cun_default
queue again.
This goes on until there are no more retry attempts, in which case the job gets moved into the cun_dead
queue. It can be then only be moved back into the cun_default
queue manually (from RabbitMQ itself, not with Cuniculus); otherwise it is discarded after some time, defined as the Cuniculus::Config#dead_queue_ttl, in milliseconds (by default, 180 days).
Note that if a job cannot even be parsed, it is moved straight to the dead queue, as there's no point in retrying.
Health check plugin
Cuniculus ships with a health check plugin. When enabled, a Rack server is started (therefore the Rack gem is required, as well as the used handler), which responds with 200 OK
upon receiving a request in the configured port and path.
Enable it with Cuniculus.plugin(:health_check)
, which binds the server to 0.0.0.0:3000
, listening on the /healthcheck
path. To configure the server, pass additional options:
Cuniculus.plugin(:health_check, { "bind_to" => "127.0.0.1", "port" => 3003, "path" => "ping" })
Check Cuniculus::Plugins::HealthCheck for further details.
Note that the default handler "webrick" is not bundled by default with Ruby 3 and needs to be installed separately, if it is to be used.
How it works
Cuniculus code and conventions are very much inspired by another Ruby job queue library: Sidekiq.
To communicate with RabbitMQ, Cuniculus uses Bunny.
The first time an async job is produced, a thread pool is created, each thread with its own communication channel to RabbitMQ. These threads push jobs to RabbitMQ.
For consuming, each queue will have a corresponding thread pool (handled by Bunny) for concurrency.
License
Cuniculus is licensed under the "BSD 2-Clause License". See LICENSE for details.