Class: Qu::Backend::Mongo
- Inherits:
-
Base
- Object
- Base
- Qu::Backend::Mongo
- Defined in:
- lib/qu/backend/mongo.rb
Instance Attribute Summary collapse
-
#max_retries ⇒ Object
Number of times to retry connection on connection failure (default: 5).
-
#poll_frequency ⇒ Object
Seconds to wait before looking for more jobs when the queue is empty (default: 5).
-
#retry_frequency ⇒ Object
Seconds to wait before try to reconnect after connection failure (default: 1).
Instance Method Summary collapse
- #clear(queue = nil) ⇒ Object
- #clear_workers ⇒ Object
- #completed(payload) ⇒ Object
- #connection ⇒ Object (also: #database)
- #enqueue(payload) ⇒ Object
- #failed(payload, error) ⇒ Object
-
#initialize ⇒ Mongo
constructor
A new instance of Mongo.
- #length(queue = 'default') ⇒ Object
- #queues ⇒ Object
- #register_worker(worker) ⇒ Object
- #release(payload) ⇒ Object
- #reserve(worker, options = {:block => true}) ⇒ Object
- #unregister_worker(worker) ⇒ Object
- #workers ⇒ Object
Constructor Details
#initialize ⇒ Mongo
Returns a new instance of Mongo.
16 17 18 19 20 |
# File 'lib/qu/backend/mongo.rb', line 16 def initialize self.max_retries = 5 self.retry_frequency = 1 self.poll_frequency = 5 end |
Instance Attribute Details
#max_retries ⇒ Object
Number of times to retry connection on connection failure (default: 5)
8 9 10 |
# File 'lib/qu/backend/mongo.rb', line 8 def max_retries @max_retries end |
#poll_frequency ⇒ Object
Seconds to wait before looking for more jobs when the queue is empty (default: 5)
14 15 16 |
# File 'lib/qu/backend/mongo.rb', line 14 def poll_frequency @poll_frequency end |
#retry_frequency ⇒ Object
Seconds to wait before try to reconnect after connection failure (default: 1)
11 12 13 |
# File 'lib/qu/backend/mongo.rb', line 11 def retry_frequency @retry_frequency end |
Instance Method Details
#clear(queue = nil) ⇒ Object
40 41 42 43 44 45 46 47 48 |
# File 'lib/qu/backend/mongo.rb', line 40 def clear(queue = nil) queue ||= queues + ['failed'] logger.info { "Clearing queues: #{queue.inspect}" } Array(queue).each do |q| logger.debug "Clearing queue #{q}" jobs(q).drop self[:queues].remove({:name => q}) end end |
#clear_workers ⇒ Object
116 117 118 119 |
# File 'lib/qu/backend/mongo.rb', line 116 def clear_workers logger.info "Clearing workers" self[:workers].drop end |
#completed(payload) ⇒ Object
97 98 |
# File 'lib/qu/backend/mongo.rb', line 97 def completed(payload) end |
#connection ⇒ Object Also known as: database
22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 |
# File 'lib/qu/backend/mongo.rb', line 22 def connection @connection ||= begin host_uri = (ENV['MONGOHQ_URL'] || ENV['MONGOLAB_URI']).to_s uri = URI.parse(host_uri) database = uri.path.empty? ? 'qu' : uri.path[1..-1] = {} if uri.password [:auths] = [{ 'db_name' => database, 'username' => uri.user, 'password' => uri.password }] end ::Mongo::Connection.new(uri.host, uri.port, ).db(database) end end |
#enqueue(payload) ⇒ Object
58 59 60 61 62 63 64 |
# File 'lib/qu/backend/mongo.rb', line 58 def enqueue(payload) payload.id = BSON::ObjectId.new jobs(payload.queue).insert({:_id => payload.id, :klass => payload.klass.to_s, :args => payload.args}) self[:queues].update({:name => payload.queue}, {:name => payload.queue}, :upsert => true) logger.debug { "Enqueued job #{payload}" } payload end |
#failed(payload, error) ⇒ Object
93 94 95 |
# File 'lib/qu/backend/mongo.rb', line 93 def failed(payload, error) jobs('failed').insert(:_id => payload.id, :klass => payload.klass.to_s, :args => payload.args, :queue => payload.queue) end |
#length(queue = 'default') ⇒ Object
54 55 56 |
# File 'lib/qu/backend/mongo.rb', line 54 def length(queue = 'default') jobs(queue).count end |
#queues ⇒ Object
50 51 52 |
# File 'lib/qu/backend/mongo.rb', line 50 def queues self[:queues].find.map {|doc| doc['name'] } end |
#register_worker(worker) ⇒ Object
100 101 102 103 |
# File 'lib/qu/backend/mongo.rb', line 100 def register_worker(worker) logger.debug "Registering worker #{worker.id}" self[:workers].insert(worker.attributes.merge(:id => worker.id)) end |
#release(payload) ⇒ Object
89 90 91 |
# File 'lib/qu/backend/mongo.rb', line 89 def release(payload) jobs(payload.queue).insert({:_id => payload.id, :klass => payload.klass.to_s, :args => payload.args}) end |
#reserve(worker, options = {:block => true}) ⇒ Object
66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 |
# File 'lib/qu/backend/mongo.rb', line 66 def reserve(worker, = {:block => true}) loop do worker.queues.each do |queue| logger.debug { "Reserving job in queue #{queue}" } begin if doc = jobs(queue).find_and_modify(:remove => true) doc['id'] = doc.delete('_id') return Payload.new(doc) end rescue ::Mongo::OperationFailure # No jobs in the queue (MongoDB <2) end end if [:block] sleep poll_frequency else break end end end |
#unregister_worker(worker) ⇒ Object
105 106 107 108 |
# File 'lib/qu/backend/mongo.rb', line 105 def unregister_worker(worker) logger.debug "Unregistering worker #{worker.id}" self[:workers].remove(:id => worker.id) end |
#workers ⇒ Object
110 111 112 113 114 |
# File 'lib/qu/backend/mongo.rb', line 110 def workers self[:workers].find.map do |doc| Qu::Worker.new(doc) end end |