redis_safe_queue
RedisSafeQueue is a transactional queue for ruby/redis. It guarantees at least once semantics.
The queue may be used with multiple producers and consumers, each job is removed
in a open/commit transaction; even if a worker dies while processing a job, it is
automatically requeued after the specified timeout
.
You can create multiple instances of the RedisSafeQueue class with the same
queue_id
(i.e. on multiple machines), they will share the same queue and
synchronize.
Each queue provides:
A
work
method that processes jobs in a loop and returns when the queue is empty. You may provide a maximum number of jobs as an integer argument and/or invoke thework
method in a loop to run a continuous queue.A
finish
method that blocks until the queue is empty and then yields only once. When multiple instances of the RedisSafeQueue class with the samequeue_id
exist (i.e. on multiple machines) only one instance will yield and the others will return without doing anything.
Example
require "rubygems"
require "redis_safe_queue"
$processed = 0
# create a new queue 'my_queue_1' with a 1-minute timeout, if a worker
# exceeds the timeout while processing a job it is implicitly requeued
my_queue = RedisSafeQueue.new(
:redis => Redis.new,
:queue_id => "my_queue_1",
:timeout => 60
)
# push 10.000 items to the queue
10_000.times do |i|
my_queue.push("fnord-#{rand(123)}")
end
# start 10 workers/threads
10.times { Thread.new {
# this will process jobs in parallel until the queue is empty
my_queue.work do |job|
$processed += 1
end
# this will yield only once, on one of the 10 threads
my_queue.finish do
puts "queue finished, processed #{$processed} jobs on 10 threads"
exit 0
end
} }
Limitations
- A job may be processed more than once if a worker takes longer than
timeout
to process a job (the transaction is only commited after the block passed towork
returns) - A job may be processed more than once if a redis network roundtrip takes longer than
TX_COMMIT_TIMEOUT
(default: 10 seconds) during thestart_tx
procedure - Since queue appends are not synchronized there is a known race condition that occurs when producing and consuming from the same queue (same
queue_id
) at the same time; in this case thework
method may return even though there are still jobs left in the queue. This can be mitigated by calling thework
method in a loop and never invokingfinish
. - In a distributed setup the system clocks must be in sync on all machines since redis_safe_queue implements a timestamp-based lock expiration mechanism
Installation
gem install redis_safe_queue
or in your Gemfile:
gem 'redis_safe_queue', '~> 0.1'
License
Copyright (c) 2011 Paul Asmuth
Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to use, copy and modify copies of the Software, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.