Class: Qu::Backend::Mongo
- Inherits:
-
Base
- Object
- Base
- Qu::Backend::Mongo
- Defined in:
- lib/qu/backend/mongo.rb
Instance Method Summary collapse
- #clear(queue = nil) ⇒ Object
- #clear_workers ⇒ Object
- #completed(payload) ⇒ Object
- #connection ⇒ Object (also: #database)
- #enqueue(payload) ⇒ Object
- #failed(payload, error) ⇒ Object
- #length(queue = 'default') ⇒ Object
- #queues ⇒ Object
- #register_worker(worker) ⇒ Object
- #release(payload) ⇒ Object
- #requeue(id) ⇒ Object
- #reserve(worker, options = {:block => true}) ⇒ Object
- #unregister_worker(worker) ⇒ Object
- #workers ⇒ Object
Instance Method Details
#clear(queue = nil) ⇒ Object
23 24 25 26 27 28 29 30 31 |
# File 'lib/qu/backend/mongo.rb', line 23 def clear(queue = nil) queue ||= queues + ['failed'] logger.info { "Clearing queues: #{queue.inspect}" } Array(queue).each do |q| logger.debug "Clearing queue #{q}" jobs(q).drop self[:queues].remove({:name => q}) end end |
#clear_workers ⇒ Object
109 110 111 112 |
# File 'lib/qu/backend/mongo.rb', line 109 def clear_workers logger.info "Clearing workers" self[:workers].drop end |
#completed(payload) ⇒ Object
80 81 |
# File 'lib/qu/backend/mongo.rb', line 80 def completed(payload) end |
#connection ⇒ Object Also known as: database
6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 |
# File 'lib/qu/backend/mongo.rb', line 6 def connection @connection ||= begin uri = URI.parse(ENV['MONGOHQ_URL'].to_s) database = uri.path.empty? ? 'qu' : uri.path[1..-1] = {} if uri.password [:auths] = [{ 'db_name' => database, 'username' => uri.user, 'password' => uri.password }] end ::Mongo::Connection.new(uri.host, uri.port, ).db(database) end end |
#enqueue(payload) ⇒ Object
41 42 43 44 45 46 47 |
# File 'lib/qu/backend/mongo.rb', line 41 def enqueue(payload) payload.id = BSON::ObjectId.new jobs(payload.queue).insert({:_id => payload.id, :klass => payload.klass.to_s, :args => payload.args}) self[:queues].update({:name => payload.queue}, {:name => payload.queue}, :upsert => true) logger.debug { "Enqueued job #{payload}" } payload end |
#failed(payload, error) ⇒ Object
76 77 78 |
# File 'lib/qu/backend/mongo.rb', line 76 def failed(payload, error) jobs('failed').insert(:_id => payload.id, :klass => payload.klass.to_s, :args => payload.args, :queue => payload.queue) end |
#length(queue = 'default') ⇒ Object
37 38 39 |
# File 'lib/qu/backend/mongo.rb', line 37 def length(queue = 'default') jobs(queue).count end |
#queues ⇒ Object
33 34 35 |
# File 'lib/qu/backend/mongo.rb', line 33 def queues self[:queues].find.map {|doc| doc['name'] } end |
#register_worker(worker) ⇒ Object
93 94 95 96 |
# File 'lib/qu/backend/mongo.rb', line 93 def register_worker(worker) logger.debug "Registering worker #{worker.id}" self[:workers].insert(worker.attributes.merge(:id => worker.id)) end |
#release(payload) ⇒ Object
72 73 74 |
# File 'lib/qu/backend/mongo.rb', line 72 def release(payload) jobs(payload.queue).insert({:_id => payload.id, :klass => payload.klass.to_s, :args => payload.args}) end |
#requeue(id) ⇒ Object
83 84 85 86 87 88 89 90 91 |
# File 'lib/qu/backend/mongo.rb', line 83 def requeue(id) logger.debug "Requeuing job #{id}" doc = jobs('failed').find_and_modify(:query => {:_id => id}, :remove => true) || raise(::Mongo::OperationFailure) jobs(doc.delete('queue')).insert(doc) doc['id'] = doc.delete('_id') Payload.new(doc) rescue ::Mongo::OperationFailure false end |
#reserve(worker, options = {:block => true}) ⇒ Object
49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 |
# File 'lib/qu/backend/mongo.rb', line 49 def reserve(worker, = {:block => true}) loop do worker.queues.each do |queue| logger.debug { "Reserving job in queue #{queue}" } begin if doc = jobs(queue).find_and_modify(:remove => true) doc['id'] = doc.delete('_id') return Payload.new(doc) end rescue ::Mongo::OperationFailure # No jobs in the queue (MongoDB <2) end end if [:block] sleep 5 else break end end end |
#unregister_worker(worker) ⇒ Object
98 99 100 101 |
# File 'lib/qu/backend/mongo.rb', line 98 def unregister_worker(worker) logger.debug "Unregistering worker #{worker.id}" self[:workers].remove(:id => worker.id) end |
#workers ⇒ Object
103 104 105 106 107 |
# File 'lib/qu/backend/mongo.rb', line 103 def workers self[:workers].find.map do |doc| Qu::Worker.new(doc) end end |