Class: RedisSafeQueue
- Inherits:
-
Object
- Object
- RedisSafeQueue
- Defined in:
- lib/redis_safe_queue.rb
Constant Summary collapse
- TX_COMMIT_TIMEOUT =
redis must process all commands within 10s
10
Instance Attribute Summary collapse
-
#iterations ⇒ Object
Returns the value of attribute iterations.
-
#prefix ⇒ Object
Returns the value of attribute prefix.
-
#queue_id ⇒ Object
Returns the value of attribute queue_id.
-
#redis ⇒ Object
Returns the value of attribute redis.
-
#timeout ⇒ Object
Returns the value of attribute timeout.
Instance Method Summary collapse
- #finish ⇒ Object
-
#initialize(opts) ⇒ RedisSafeQueue
constructor
A new instance of RedisSafeQueue.
- #push(job_data) ⇒ Object
- #size ⇒ Object
- #work(max_jobs = nil) ⇒ Object
Constructor Details
#initialize(opts) ⇒ RedisSafeQueue
Returns a new instance of RedisSafeQueue.
11 12 13 14 15 16 17 |
# File 'lib/redis_safe_queue.rb', line 11 def initialize(opts) @redis = opts.fetch(:redis) @prefix = opts[:redis_prefix] || :redis_safe_queue @queue_id = opts.fetch(:queue_id) @timeout = opts.fetch(:timeout).to_i @iterations = 0 end |
Instance Attribute Details
#iterations ⇒ Object
Returns the value of attribute iterations.
9 10 11 |
# File 'lib/redis_safe_queue.rb', line 9 def iterations @iterations end |
#prefix ⇒ Object
Returns the value of attribute prefix.
9 10 11 |
# File 'lib/redis_safe_queue.rb', line 9 def prefix @prefix end |
#queue_id ⇒ Object
Returns the value of attribute queue_id.
9 10 11 |
# File 'lib/redis_safe_queue.rb', line 9 def queue_id @queue_id end |
#redis ⇒ Object
Returns the value of attribute redis.
9 10 11 |
# File 'lib/redis_safe_queue.rb', line 9 def redis @redis end |
#timeout ⇒ Object
Returns the value of attribute timeout.
9 10 11 |
# File 'lib/redis_safe_queue.rb', line 9 def timeout @timeout end |
Instance Method Details
#finish ⇒ Object
39 40 41 42 43 44 45 46 47 |
# File 'lib/redis_safe_queue.rb', line 39 def finish sleep 1 while size > 0 lock = @redis.setnx(key(:finished), 1) yield if lock @redis.expire(key, @timeout) @redis.expire(key(:finished), @timeout) end |
#push(job_data) ⇒ Object
19 20 21 22 23 |
# File 'lib/redis_safe_queue.rb', line 19 def push(job_data) job_id = unique_id @redis.set(key(job_id, :job), job_data) @redis.sadd(key, job_id) end |
#size ⇒ Object
49 50 51 |
# File 'lib/redis_safe_queue.rb', line 49 def size @redis.scard(key) end |
#work(max_jobs = nil) ⇒ Object
25 26 27 28 29 30 31 32 33 34 35 36 37 |
# File 'lib/redis_safe_queue.rb', line 25 def work(max_jobs = nil) loop do break if max_jobs == 0 if job_id = start_tx job = get_job(job_id) yield(job) commit_tx(job_id) max_jobs -= 1 if max_jobs else break end end end |