quredis

quredis (pronounced ‘kurdis’) is a little DSL for redis safe queues using brpoplpush. brpoplpush command in Redis atomically pops a message from one list and pushes onto another list. This primitive is exactly what we need to build fail-safe queues.

A quredis queue is identified by its name. A queredis queue is employs three redis lists. The names of redis lists (queues) can be configured for these lists but also follow a naming convention described below.

Name Default Name Description
Ingress Queue ingress:queue_name Messages are enqueued
Transit Queue transit:queue_name Messages are held while they are being processed
Escape Queue escape:queue_name Failed messages which are not successfully processed

Producer

Messages are enqueued using redis lpush command.

redis.lpush('ingress:queue_name', message)

Consumer

To handle messages, include Quredis module. quredis(name, options = {}, &block) method sets things up. The block passed to the quredis method handles the message. Call to start method and let the messages flow.

class Shop
  include Quredis

  def initialize
    quredis :shop do |customer|
      puts "Now serving #{customer}."
    end
  end
end

Shop.new.start

Custom queue names can be passed as options to quredis method:

quredis :mail, 
  {:ingress => :read_later, :transit => :reading_now, :escape => :trash} do |message|
  puts "now reading #{message}"
end

Quredis CLI

Quredis comes equiped with a CLI. It supports listing queues, purging a list in a queue and retrying messages stuck in transit and escape queues.

> quredis help
Tasks:
  quredis destroy queue_name           # destroy the specified queue queue
  quredis help [TASK]                  # Describe available tasks or one spec...
  quredis ls                           # list queues
  quredis purge queue_name queue_type  # purges queue_name queue_type
  quredis retry queue_name queue_type  # re-enqueues queue type
  quredis version                      # version
  quredis web                          # start webapp

To purge the escape queue of mail queue in the consumer example above:

> quredis purge mail escape

Note that this will remove all mesage in the escape queue ('trash').

To retry the escape queue of mail queue in the consumer example above:

> quredis retry mail escape

It will re enqueue all the messages from escape queue ('trash') to ingress queue ('read_later').

> quredis web

Launches a web interface to monitor quredis queues. See next section for details.

Quredis Web

quredis web starts a web app to monitor quredis queues. You can specify redis host, port and web app port etc via command line arguments.

> quredis help web
Usage:
quredis web

Options:
  -H, [--redis-host=REDIS_HOST]  # redis server host
  -P, [--redis-port=N]           # redis server port
  -b, [--bind=BIND]              # webapp IP address to bind to (default: 0.0.0.0)
  -p, [--port=N]                 # webapp Port to listen on
  -s, [--server=SERVER]          # specify rack server/handler (default is thin)

start webapp

Web app

Configuring Redis

To configure redis connection, either provide a method called redis.

Configuring Logger

Provide a method called logger to plug a custom logger.

Under the Hood

Quredis moves messages from the ingress list, ingress:queue_name, to a transit list, transit:queue_name atomically using brpoplpush. The operation returns the message that was moved from the ingress list to the transit list. Next, the application code processes the message and deletes the message from the transit list using red is lrem command.

message = redis.brpoplpush(ingress, transit, timeout)
if message
  handle_message(message)
  redis.lrem(transit, -1, message)
end

This code snippet above handles normal execution. To handle the the cases when application code crashes or there is a hardware failure, etc, Quredis introduces a third list, called escape list. If handle_message(message) fails because it throws an exception, then the message is atomically removed from transit list and added to the escape list. If the application crashes or hardware fails while handling the message, the message stays in transit list to be processed later. When the application starts again, quredis runs the recovery operation where we use rpoplpush to move messages from transit list the the escape list. If a message is handled normally, it’s removed from the escape queue, otherwise it stays in the escape queue.

while message = redis.rpoplpush(transit, escape)
  handle_message(message)
  redis.lrem(escape, -1, message)
end

Also the recovery on start automates the processing of any messages in flight during deployment, application migation, etc. shutdown and restarts.

Fault Tolerence

Quredis is tolerant against following faults:

Bad Message

Queredis ensures that a bad message will not cause the system from moving forward. During normal execution, if StandardError or Timeout::Error is raised while handling a message, quredis removes the message from the transit queue and moves it to the escape queue atomically. It can be later handled using admin tool which allow you to either delete the message from escape queue or reenqueue to ingress queue.

Application Failure

Queredis ensures that a non standard error, e.g., NoMemoryError, will not cause the system from moving forward. Suppose a message is causing NoMemoryError which crashes the app and the monitoring process like monit, blue pill, god, supervisor, upstart, etc, restart the application. On restart, the recovery operation will move the message to the escape queue from the transit queue and will call handle_message(message). If the app crashes again, then the message stays in the escape queue and the next application restart will not fail due to this error.

Hardware Failure

Queredis ensures that messages are moved between queues atomically which makes it fault tolrent against the application hardware failure. Quredis doesn’t protect againt any hardware problem on the redis server itself.

Quredis admin tool allows deleting and enqueuing messages from transit and escape queues.