Class: Delayed::Backend::Mongoid::Job
- Inherits:
-
Object
- Object
- Delayed::Backend::Mongoid::Job
- Includes:
- Base, Mongoid::Document, Mongoid::Timestamps
- Defined in:
- lib/delayed/backend/mongoid.rb
Class Method Summary collapse
- .after_fork ⇒ Object
- .before_fork ⇒ Object
-
.clear_locks!(worker_name) ⇒ Object
When a worker is exiting, make sure we don’t have any locked jobs.
- .db_time_now ⇒ Object
-
.reserve(worker, max_run_time = Worker.max_run_time) ⇒ Object
Reserves this job for the worker.
Class Method Details
.after_fork ⇒ Object
25 26 27 |
# File 'lib/delayed/backend/mongoid.rb', line 25 def self.after_fork ::Mongoid.master.connection.connect end |
.before_fork ⇒ Object
21 22 23 |
# File 'lib/delayed/backend/mongoid.rb', line 21 def self.before_fork ::Mongoid.master.connection.close end |
.clear_locks!(worker_name) ⇒ Object
When a worker is exiting, make sure we don’t have any locked jobs.
72 73 74 |
# File 'lib/delayed/backend/mongoid.rb', line 72 def self.clear_locks!(worker_name) self.collection.update({:locked_by => worker_name}, {"$set" => {:locked_at => nil, :locked_by => nil}}, :multi => true) end |
.db_time_now ⇒ Object
29 30 31 |
# File 'lib/delayed/backend/mongoid.rb', line 29 def self.db_time_now Time.now.utc end |
.reserve(worker, max_run_time = Worker.max_run_time) ⇒ Object
Reserves this job for the worker.
Uses Mongo’s findAndModify operation to atomically pick and lock one job from from the collection. findAndModify is not yet available directly thru Mongoid so go down to the Mongo Ruby driver instead.
38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 |
# File 'lib/delayed/backend/mongoid.rb', line 38 def self.reserve(worker, max_run_time = Worker.max_run_time) right_now = db_time_now conditions = {:run_at => {"$lte" => right_now}, :failed_at => nil} (conditions[:priority] ||= {})['$gte'] = Worker.min_priority.to_i if Worker.min_priority (conditions[:priority] ||= {})['$lte'] = Worker.max_priority.to_i if Worker.max_priority if ::Mongoid.master.connection.server_version < '1.5.3' lock_time = "new Date(#{(right_now - max_run_time).to_f * 1000})" conditions['$where'] = "this.locked_by == '#{worker.name}' || this.locked_at == null || this.locked_at < #{lock_time}" else conditions['$or'] = [ { :locked_by => worker.name }, { :locked_at => nil }, { :locked_at => { '$lt' => (right_now - max_run_time) }} ] end begin result = self.db.collection(self.collection.name).find_and_modify( :query => conditions, :sort => [['locked_by', -1], ['priority', 1], ['run_at', 1]], :update => {"$set" => {:locked_at => right_now, :locked_by => worker.name}} ) # Return result as a Mongoid document. # When Mongoid starts supporting findAndModify, this extra step should no longer be necessary. self.find(:first, :conditions => {:_id => result["_id"]}) rescue Mongo::OperationFailure nil # no jobs available end end |