redis_safe_queue

RedisSafeQueue is a transactional queue for ruby/redis. It guarantees at least once semantics.

The queue may be used with multiple producers and consumers, each job is removed in a open/commit transaction; even if a worker dies while processing a job, it is automatically requeued after the specified timeout.

You can create multiple instances of the RedisSafeQueue class with the same queue_id (i.e. on multiple machines), they will share the same queue and synchronize.

Each queue provides:

  • A work method that processes jobs in a loop and returns when the queue is empty. You may provide a maximum number of jobs as an integer argument and/or invoke the work method in a loop to run a continuous queue.

  • A finish method that blocks until the queue is empty and then yields only once. When multiple instances of the RedisSafeQueue class with the same queue_id exist (i.e. on multiple machines) only one instance will yield and the others will return without doing anything.

Example

require "rubygems"
require "redis_safe_queue"

$processed = 0

# create a new queue 'my_queue_1' with a 1-minute timeout, if a worker 
# exceeds the timeout while processing a job it is implicitly requeued
my_queue = RedisSafeQueue.new(
  :redis => Redis.new,
  :queue_id => "my_queue_1",
  :timeout => 60
)

# push 10.000 items to the queue
10_000.times do |i|
  my_queue.push("fnord-#{rand(123)}")
end

# start 10 workers/threads
10.times { Thread.new {

  # this will process jobs in parallel until the queue is empty
  my_queue.work do |job|
    $processed += 1
  end

  # this will yield only once, on one of the 10 threads
  my_queue.finish do
    puts "queue finished, processed #{$processed} jobs on 10 threads"
    exit 0
  end

} }

Limitations

  • A job may be processed more than once if a worker takes longer than timeout to process a job (the transaction is only commited after the block passed to work returns)
  • A job may be processed more than once if a redis network roundtrip takes longer than TX_COMMIT_TIMEOUT (default: 10 seconds) during the start_tx procedure
  • Since queue appends are not synchronized there is a known race condition that occurs when producing and consuming from the same queue (same queue_id) at the same time; in this case the work method may return even though there are still jobs left in the queue. This can be mitigated by calling the work method in a loop and never invoking finish.
  • In a distributed setup the system clocks must be in sync on all machines since redis_safe_queue implements a timestamp-based lock expiration mechanism

Installation

gem install redis_safe_queue

or in your Gemfile:

gem 'redis_safe_queue', '~> 0.1'

License

Copyright (c) 2011 Paul Asmuth

Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to use, copy and modify copies of the Software, subject to the following conditions:

The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software.

THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.