Class: Qu::Backend::Redis
- Inherits:
-
Base
- Object
- Base
- Qu::Backend::Redis
- Defined in:
- lib/qu/backend/redis.rb
Instance Attribute Summary collapse
-
#namespace ⇒ Object
Returns the value of attribute namespace.
Instance Method Summary collapse
- #clear(queue = nil) ⇒ Object
- #clear_workers ⇒ Object
- #completed(payload) ⇒ Object
- #connection ⇒ Object (also: #redis)
- #enqueue(payload) ⇒ Object
- #failed(payload, error) ⇒ Object
-
#initialize ⇒ Redis
constructor
A new instance of Redis.
- #length(queue = 'default') ⇒ Object
- #queues ⇒ Object
- #register_worker(worker) ⇒ Object
- #release(payload) ⇒ Object
- #reserve(worker, options = {:block => true}) ⇒ Object
- #unregister_worker(worker) ⇒ Object
- #workers ⇒ Object
Constructor Details
#initialize ⇒ Redis
Returns a new instance of Redis.
9 10 11 |
# File 'lib/qu/backend/redis.rb', line 9 def initialize self.namespace = :qu end |
Instance Attribute Details
#namespace ⇒ Object
Returns the value of attribute namespace.
7 8 9 |
# File 'lib/qu/backend/redis.rb', line 7 def namespace @namespace end |
Instance Method Details
#clear(queue = nil) ⇒ Object
31 32 33 34 35 36 37 38 39 40 41 42 |
# File 'lib/qu/backend/redis.rb', line 31 def clear(queue = nil) queue ||= queues + ['failed'] logger.info { "Clearing queues: #{queue.inspect}" } Array(queue).each do |q| logger.debug "Clearing queue #{q}" while id = redis.lpop("queue:#{q}") logger.debug "Clearing job #{id}" redis.del("job:#{id}") end redis.srem('queues', q) end end |
#clear_workers ⇒ Object
90 91 92 93 94 95 96 |
# File 'lib/qu/backend/redis.rb', line 90 def clear_workers logger.info "Clearing workers" while id = redis.spop(:workers) logger.debug "Clearing worker #{id}" redis.del("worker:#{id}") end end |
#completed(payload) ⇒ Object
70 71 72 |
# File 'lib/qu/backend/redis.rb', line 70 def completed(payload) redis.del("job:#{payload.id}") end |
#connection ⇒ Object Also known as: redis
13 14 15 |
# File 'lib/qu/backend/redis.rb', line 13 def connection @connection ||= ::Redis::Namespace.new(namespace, :redis => ::Redis.connect(:url => ENV['REDISTOGO_URL'])) end |
#enqueue(payload) ⇒ Object
18 19 20 21 22 23 24 25 |
# File 'lib/qu/backend/redis.rb', line 18 def enqueue(payload) payload.id = SimpleUUID::UUID.new.to_guid redis.set("job:#{payload.id}", encode('klass' => payload.klass.to_s, 'args' => payload.args)) redis.rpush("queue:#{payload.queue}", payload.id) redis.sadd('queues', payload.queue) logger.debug { "Enqueued job #{payload}" } payload end |
#failed(payload, error) ⇒ Object
66 67 68 |
# File 'lib/qu/backend/redis.rb', line 66 def failed(payload, error) redis.rpush("queue:failed", payload.id) end |
#length(queue = 'default') ⇒ Object
27 28 29 |
# File 'lib/qu/backend/redis.rb', line 27 def length(queue = 'default') redis.llen("queue:#{queue}") end |
#queues ⇒ Object
44 45 46 |
# File 'lib/qu/backend/redis.rb', line 44 def queues Array(redis.smembers('queues')) end |
#register_worker(worker) ⇒ Object
74 75 76 77 78 |
# File 'lib/qu/backend/redis.rb', line 74 def register_worker(worker) logger.debug "Registering worker #{worker.id}" redis.set("worker:#{worker.id}", encode(worker.attributes)) redis.sadd(:workers, worker.id) end |
#release(payload) ⇒ Object
62 63 64 |
# File 'lib/qu/backend/redis.rb', line 62 def release(payload) redis.rpush("queue:#{payload.queue}", payload.id) end |
#reserve(worker, options = {:block => true}) ⇒ Object
48 49 50 51 52 53 54 55 56 57 58 59 60 |
# File 'lib/qu/backend/redis.rb', line 48 def reserve(worker, = {:block => true}) queues = worker.queues.map {|q| "queue:#{q}" } logger.debug { "Reserving job in queues #{queues.inspect}"} if [:block] id = redis.blpop(*queues.push(0))[1] else queues.detect {|queue| id = redis.lpop(queue) } end get(id) if id end |
#unregister_worker(worker) ⇒ Object
80 81 82 83 84 |
# File 'lib/qu/backend/redis.rb', line 80 def unregister_worker(worker) logger.debug "Unregistering worker #{worker.id}" redis.del("worker:#{worker.id}") redis.srem('workers', worker.id) end |
#workers ⇒ Object
86 87 88 |
# File 'lib/qu/backend/redis.rb', line 86 def workers Array(redis.smembers(:workers)).map { |id| worker(id) }.compact end |