Class: RedisSafeQueue

Inherits:
Object
  • Object
show all
Defined in:
lib/redis_safe_queue.rb

Constant Summary collapse

TX_COMMIT_TIMEOUT =

redis must process all commands within 10s

10

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(opts) ⇒ RedisSafeQueue

Returns a new instance of RedisSafeQueue.



11
12
13
14
15
16
17
# File 'lib/redis_safe_queue.rb', line 11

def initialize(opts)
  @redis = opts.fetch(:redis)
  @prefix = opts[:redis_prefix] || :redis_safe_queue
  @queue_id = opts.fetch(:queue_id)
  @timeout = opts.fetch(:timeout).to_i
  @iterations = 0
end

Instance Attribute Details

#iterationsObject

Returns the value of attribute iterations.



9
10
11
# File 'lib/redis_safe_queue.rb', line 9

def iterations
  @iterations
end

#prefixObject

Returns the value of attribute prefix.



9
10
11
# File 'lib/redis_safe_queue.rb', line 9

def prefix
  @prefix
end

#queue_idObject

Returns the value of attribute queue_id.



9
10
11
# File 'lib/redis_safe_queue.rb', line 9

def queue_id
  @queue_id
end

#redisObject

Returns the value of attribute redis.



9
10
11
# File 'lib/redis_safe_queue.rb', line 9

def redis
  @redis
end

#timeoutObject

Returns the value of attribute timeout.



9
10
11
# File 'lib/redis_safe_queue.rb', line 9

def timeout
  @timeout
end

Instance Method Details

#finishObject



39
40
41
42
43
44
45
46
47
# File 'lib/redis_safe_queue.rb', line 39

def finish
  sleep 1 while size > 0

  lock = @redis.setnx(key(:finished), 1)
  yield if lock

  @redis.expire(key, @timeout)
  @redis.expire(key(:finished), @timeout)
end

#push(job_data) ⇒ Object



19
20
21
22
23
# File 'lib/redis_safe_queue.rb', line 19

def push(job_data)
  job_id = unique_id
  @redis.set(key(job_id, :job), job_data)
  @redis.sadd(key, job_id)
end

#sizeObject



49
50
51
# File 'lib/redis_safe_queue.rb', line 49

def size
  @redis.scard(key)
end

#work(max_jobs = nil) ⇒ Object



25
26
27
28
29
30
31
32
33
34
35
36
37
# File 'lib/redis_safe_queue.rb', line 25

def work(max_jobs = nil)
  loop do
    break if max_jobs == 0
    if job_id = start_tx
      job = get_job(job_id)
      yield(job)
      commit_tx(job_id)
      max_jobs -= 1 if max_jobs
    else
      break
    end
  end
end