Class: Delayed::Backend::MongoMapper::Job
- Inherits:
-
Object
- Object
- Delayed::Backend::MongoMapper::Job
- Includes:
- Base, MongoMapper::Document
- Defined in:
- lib/delayed/backend/mongo_mapper.rb
Constant Summary
Constants included from Base
Class Method Summary collapse
-
.clear_locks!(worker_name) ⇒ Object
When a worker is exiting, make sure we don’t have any locked jobs.
- .db_time_now ⇒ Object
- .find_available(worker_name, limit = 5, max_run_time = Worker.max_run_time) ⇒ Object
Instance Method Summary collapse
-
#lock_exclusively!(max_run_time, worker = worker_name) ⇒ Object
Lock this job for this worker.
Methods included from Base
#failed?, included, #invoke_job, #name, #payload_object, #payload_object=, #unlock
Class Method Details
.clear_locks!(worker_name) ⇒ Object
When a worker is exiting, make sure we don’t have any locked jobs.
64 65 66 |
# File 'lib/delayed/backend/mongo_mapper.rb', line 64 def self.clear_locks!(worker_name) collection.update({:locked_by => worker_name}, {"$set" => {:locked_at => nil, :locked_by => nil}}, :multi => true) end |
.db_time_now ⇒ Object
39 40 41 |
# File 'lib/delayed/backend/mongo_mapper.rb', line 39 def self.db_time_now ::MongoMapper.time_class.now.utc end |
.find_available(worker_name, limit = 5, max_run_time = Worker.max_run_time) ⇒ Object
43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 |
# File 'lib/delayed/backend/mongo_mapper.rb', line 43 def self.find_available(worker_name, limit = 5, max_run_time = Worker.max_run_time) right_now = db_time_now conditions = { :run_at => {"$lte" => right_now}, :limit => limit, :failed_at => nil, :sort => [['priority', 1], ['run_at', 1]] } where = "this.locked_at == null || this.locked_at < #{make_date(right_now - max_run_time)}" (conditions[:priority] ||= {})['$gte'] = Worker.min_priority if Worker.min_priority (conditions[:priority] ||= {})['$lte'] = Worker.max_priority if Worker.max_priority results = all(conditions.merge(:locked_by => worker_name)) results += all(conditions.merge('$where' => where)) if results.size < limit results end |
Instance Method Details
#lock_exclusively!(max_run_time, worker = worker_name) ⇒ Object
Lock this job for this worker. Returns true if we have the lock, false otherwise.
70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 |
# File 'lib/delayed/backend/mongo_mapper.rb', line 70 def lock_exclusively!(max_run_time, worker = worker_name) right_now = self.class.db_time_now overtime = right_now - max_run_time.to_i query = "this.locked_at == null || this.locked_at < #{make_date(overtime)} || this.locked_by == #{worker.to_json}" conditions = {:_id => id, :run_at => {"$lte" => right_now}, "$where" => query} collection.update(conditions, {"$set" => {:locked_at => right_now, :locked_by => worker}}) affected_rows = collection.find({:_id => id, :locked_by => worker}).count if affected_rows == 1 self.locked_at = right_now self.locked_by = worker return true else return false end end |