Class: Delayed::Backend::MongoMapper::Job
- Inherits:
-
Object
- Object
- Delayed::Backend::MongoMapper::Job
- Includes:
- Base, MongoMapper::Document
- Defined in:
- lib/delayed/backend/mongo_mapper.rb
Class Method Summary collapse
- .after_fork ⇒ Object
- .before_fork ⇒ Object
-
.clear_locks!(worker_name) ⇒ Object
When a worker is exiting, make sure we don’t have any locked jobs.
- .db_time_now ⇒ Object
- .find_available(worker_name, limit = 5, max_run_time = Worker.max_run_time) ⇒ Object
Instance Method Summary collapse
-
#lock_exclusively!(max_run_time, worker = worker_name) ⇒ Object
Lock this job for this worker.
Class Method Details
.after_fork ⇒ Object
27 28 29 |
# File 'lib/delayed/backend/mongo_mapper.rb', line 27 def self.after_fork ::MongoMapper.connection.connect end |
.before_fork ⇒ Object
23 24 25 |
# File 'lib/delayed/backend/mongo_mapper.rb', line 23 def self.before_fork ::MongoMapper.connection.close end |
.clear_locks!(worker_name) ⇒ Object
When a worker is exiting, make sure we don’t have any locked jobs.
56 57 58 |
# File 'lib/delayed/backend/mongo_mapper.rb', line 56 def self.clear_locks!(worker_name) collection.update({:locked_by => worker_name}, {"$set" => {:locked_at => nil, :locked_by => nil}}, :multi => true) end |
.db_time_now ⇒ Object
31 32 33 |
# File 'lib/delayed/backend/mongo_mapper.rb', line 31 def self.db_time_now Time.now.utc end |
.find_available(worker_name, limit = 5, max_run_time = Worker.max_run_time) ⇒ Object
35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 |
# File 'lib/delayed/backend/mongo_mapper.rb', line 35 def self.find_available(worker_name, limit = 5, max_run_time = Worker.max_run_time) right_now = db_time_now conditions = { :run_at => {"$lte" => right_now}, :limit => -limit, # In mongo, positive limits are 'soft' and negative are 'hard' :failed_at => nil, :sort => [[:priority, 1], [:run_at, 1]] } where = "this.locked_at == null || this.locked_at < #{make_date(right_now - max_run_time)}" (conditions[:priority] ||= {})['$gte'] = Worker.min_priority.to_i if Worker.min_priority (conditions[:priority] ||= {})['$lte'] = Worker.max_priority.to_i if Worker.max_priority results = all(conditions.merge(:locked_by => worker_name)) results += all(conditions.merge('$where' => where)) if results.size < limit results end |
Instance Method Details
#lock_exclusively!(max_run_time, worker = worker_name) ⇒ Object
Lock this job for this worker. Returns true if we have the lock, false otherwise.
62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 |
# File 'lib/delayed/backend/mongo_mapper.rb', line 62 def lock_exclusively!(max_run_time, worker = worker_name) right_now = self.class.db_time_now overtime = right_now - max_run_time.to_i query = "this.locked_at == null || this.locked_at < #{make_date(overtime)} || this.locked_by == #{worker.to_json}" conditions = {:_id => id, :run_at => {"$lte" => right_now}, "$where" => query} collection.update(conditions, {"$set" => {:locked_at => right_now, :locked_by => worker}}) affected_rows = self.collection.find({:_id => id, :locked_by => worker}).count if affected_rows == 1 self.locked_at = right_now self.locked_by = worker return true else return false end end |