Class: Qu::Backend::Memory
- Inherits:
-
Base
- Object
- Base
- Qu::Backend::Memory
- Defined in:
- lib/qu/backend/memory.rb
Instance Method Summary collapse
- #clear(queue = nil) ⇒ Object
- #clear_workers ⇒ Object
- #completed(payload) ⇒ Object
- #connection ⇒ Object
- #delayed_push(time, payload) ⇒ Object
- #enqueue(payload) ⇒ Object
- #failed(payload, error) ⇒ Object
- #get_queue_by_klass(klass) ⇒ Object
- #get_queue_by_name(queue = 'default') ⇒ Object
- #get_schedule_by_klass(klass) ⇒ Object
-
#initialize ⇒ Memory
constructor
A new instance of Memory.
- #length(queue = 'default') ⇒ Object
- #queues ⇒ Object
- #register_worker(worker) ⇒ Object
- #release(payload) ⇒ Object
- #remove_delayed(klass, *args) ⇒ Object
- #requeue(id) ⇒ Object
- #reserve(worker, options = {:block => true}) ⇒ Object
- #unregister_worker(worker) ⇒ Object
- #workers ⇒ Object
Constructor Details
#initialize ⇒ Memory
Returns a new instance of Memory.
7 8 9 10 |
# File 'lib/qu/backend/memory.rb', line 7 def initialize @workers = Hash.new @queues = Hash.new {|h,k| h[k] = (k == "failed" ? {} : [])} end |
Instance Method Details
#clear(queue = nil) ⇒ Object
24 25 26 27 28 29 30 31 |
# File 'lib/qu/backend/memory.rb', line 24 def clear(queue = nil) queue ||= queues + ['failed'] logger.info { "Clearing queues: #{queue.inspect}" } Array(queue).each do |q| logger.debug "Clearing queue #{q}" @queues.delete(q) end end |
#clear_workers ⇒ Object
43 44 45 |
# File 'lib/qu/backend/memory.rb', line 43 def clear_workers @workers = Hash.new end |
#completed(payload) ⇒ Object
101 |
# File 'lib/qu/backend/memory.rb', line 101 def completed(payload); end |
#connection ⇒ Object
12 13 14 |
# File 'lib/qu/backend/memory.rb', line 12 def connection @connection ||= true end |
#delayed_push(time, payload) ⇒ Object
60 61 62 63 64 65 66 |
# File 'lib/qu/backend/memory.rb', line 60 def delayed_push(time, payload) payload.id = SimpleUUID::UUID.new.to_guid payload.time = time @queues[scheduled_queue_name(payload.klass)] << payload logger.debug { "Enqueued delayed job #{payload}" } payload end |
#enqueue(payload) ⇒ Object
53 54 55 56 57 58 |
# File 'lib/qu/backend/memory.rb', line 53 def enqueue(payload) payload.id = SimpleUUID::UUID.new.to_guid @queues[payload.queue] << payload logger.debug { "Enqueued job #{payload}" } payload end |
#failed(payload, error) ⇒ Object
83 84 85 |
# File 'lib/qu/backend/memory.rb', line 83 def failed(payload, error) @queues["failed"][payload.id] = payload end |
#get_queue_by_klass(klass) ⇒ Object
107 108 109 110 |
# File 'lib/qu/backend/memory.rb', line 107 def get_queue_by_klass(klass) payload = Payload.new(:klass => klass) get_queue_by_name(payload.queue) end |
#get_queue_by_name(queue = 'default') ⇒ Object
103 104 105 |
# File 'lib/qu/backend/memory.rb', line 103 def get_queue_by_name(queue = 'default') @queues[queue.to_s] end |
#get_schedule_by_klass(klass) ⇒ Object
112 113 114 |
# File 'lib/qu/backend/memory.rb', line 112 def get_schedule_by_klass(klass) get_queue_by_name(scheduled_queue_name(klass)) end |
#length(queue = 'default') ⇒ Object
20 21 22 |
# File 'lib/qu/backend/memory.rb', line 20 def length(queue = 'default') @queues[queue].length end |
#queues ⇒ Object
16 17 18 |
# File 'lib/qu/backend/memory.rb', line 16 def queues @queues.keys.reject { |queue| @queues[queue].empty? || queue == "failed" } end |
#register_worker(worker) ⇒ Object
33 34 35 36 |
# File 'lib/qu/backend/memory.rb', line 33 def register_worker(worker) logger.debug "Registering worker #{worker.id}" @workers[worker.id] = worker.attributes.merge(:id => worker.id) end |
#release(payload) ⇒ Object
97 98 99 |
# File 'lib/qu/backend/memory.rb', line 97 def release(payload) @queues[payload.queue] << payload end |
#remove_delayed(klass, *args) ⇒ Object
68 69 70 71 72 |
# File 'lib/qu/backend/memory.rb', line 68 def remove_delayed(klass, *args) get_queue_by_name(scheduled_queue_name(klass)).delete_if do |payload| payload.klass.to_s == klass.to_s && payload.args == args end end |
#requeue(id) ⇒ Object
87 88 89 90 91 92 93 94 95 |
# File 'lib/qu/backend/memory.rb', line 87 def requeue(id) logger.debug "Requeuing job #{id}" if payload = @queues["failed"].delete(id) @queues[payload.queue] << payload payload else false end end |
#reserve(worker, options = {:block => true}) ⇒ Object
74 75 76 77 78 79 80 81 |
# File 'lib/qu/backend/memory.rb', line 74 def reserve(worker, = {:block => true}) loop do worker.queues.each do |queue| logger.debug { "Reserving job in queue #{queue}" } return @queues[queue].shift unless @queues[queue].empty? end end end |
#unregister_worker(worker) ⇒ Object
38 39 40 41 |
# File 'lib/qu/backend/memory.rb', line 38 def unregister_worker(worker) logger.debug "Unregistering worker #{worker.id}" @workers.delete(worker.id) end |
#workers ⇒ Object
47 48 49 50 51 |
# File 'lib/qu/backend/memory.rb', line 47 def workers @workers.map do |w| Qu::Worker.new(w) end end |