quredis
quredis (pronounced ‘kurdis’) is a little DSL for redis safe queues using brpoplpush
. brpoplpush
command in Redis atomically pops a message from one list and pushes onto another list. This primitive is exactly what we need to build fail-safe queues.
A quredis queue is identified by its name. A queredis queue is employs three redis lists. The names of redis lists (queues) can be configured for these lists but also follow a naming convention described below.
Name | Default Name | Description |
---|---|---|
Ingress Queue | ingress:queue_name | Messages are enqueued |
Transit Queue | transit:queue_name | Messages are held while they are being processed |
Escape Queue | escape:queue_name | Failed messages which are not successfully processed |
Producer
Messages are enqueued using redis lpush
command.
redis.lpush('ingress:queue_name', )
Consumer
To handle messages, include Quredis module. quredis(name, options = {}, &block)
method sets things up. The block passed to the quredis
method handles the message. Call to start
method and let the messages flow.
class Shop
include Quredis
def initialize
quredis :shop do |customer|
puts "Now serving #{customer}."
end
end
end
Shop.new.start
Custom queue names can be passed as options to quredis
method:
quredis :mail,
{:ingress => :read_later, :transit => :reading_now, :escape => :trash} do ||
puts "now reading #{}"
end
Quredis CLI
Quredis comes equiped with a CLI. It supports listing queues, purging a list in a queue and retrying messages stuck in transit and escape queues.
> quredis help
Tasks:
quredis destroy queue_name # destroy the specified queue queue
quredis help [TASK] # Describe available tasks or one spec...
quredis ls # list queues
quredis purge queue_name queue_type # purges queue_name queue_type
quredis retry queue_name queue_type # re-enqueues queue type
quredis version # version
quredis web # start webapp
To purge the escape queue of mail
queue in the consumer example above:
> quredis purge mail escape
Note that this will remove all mesage in the escape queue ('trash').
To retry the escape queue of mail
queue in the consumer example above:
> quredis retry mail escape
It will re enqueue all the messages from escape queue ('trash') to ingress queue ('read_later').
> quredis web
Launches a web interface to monitor quredis queues. See next section for details.
Quredis Web
quredis web
starts a web app to monitor quredis queues. You can specify redis host, port and web app port etc via command line arguments.
> quredis help web
Usage:
quredis web
Options:
-H, [--redis-host=REDIS_HOST] # redis server host
-P, [--redis-port=N] # redis server port
-b, [--bind=BIND] # webapp IP address to bind to (default: 0.0.0.0)
-p, [--port=N] # webapp Port to listen on
-s, [--server=SERVER] # specify rack server/handler (default is thin)
start webapp
Configuring Redis
To configure redis connection, either provide a method called redis
.
Configuring Logger
Provide a method called logger
to plug a custom logger.
Under the Hood
Quredis moves messages from the ingress list, ingress:queue_name
, to a transit list, transit:queue_name
atomically using brpoplpush
. The operation returns the message that was moved from the ingress list to the transit list. Next, the application code processes the message and deletes the message from the transit list using red is lrem
command.
= redis.brpoplpush(ingress, transit, timeout)
if
()
redis.lrem(transit, -1, )
end
This code snippet above handles normal execution. To handle the the cases when application code crashes or there is a hardware failure, etc, Quredis introduces a third list, called escape list. If handle_message(message)
fails because it throws an exception, then the message is atomically removed from transit
list and added to the escape
list. If the application crashes or hardware fails while handling the message, the message stays in transit list to be processed later. When the application starts again, quredis runs the recovery operation where we use rpoplpush
to move messages from transit list the the escape list. If a message is handled normally, it’s removed from the escape queue, otherwise it stays in the escape queue.
while = redis.rpoplpush(transit, escape)
()
redis.lrem(escape, -1, )
end
Also the recovery on start automates the processing of any messages in flight during deployment, application migation, etc. shutdown and restarts.
Fault Tolerence
Quredis is tolerant against following faults:
Bad Message
Queredis ensures that a bad message will not cause the system from moving forward. During normal execution, if StandardError or Timeout::Error is raised while handling a message, quredis removes the message from the transit queue and moves it to the escape queue atomically. It can be later handled using admin tool which allow you to either delete the message from escape queue or reenqueue to ingress queue.
Application Failure
Queredis ensures that a non standard error, e.g., NoMemoryError, will not cause the system from moving forward. Suppose a message is causing NoMemoryError which crashes the app and the monitoring process like monit, blue pill, god, supervisor, upstart, etc, restart the application. On restart, the recovery operation will move the message to the escape queue from the transit queue and will call handle_message(message)
. If the app crashes again, then the message stays in the escape queue and the next application restart will not fail due to this error.
Hardware Failure
Queredis ensures that messages are moved between queues atomically which makes it fault tolrent against the application hardware failure. Quredis doesn’t protect againt any hardware problem on the redis server itself.
Quredis admin tool allows deleting and enqueuing messages from transit and escape queues.