Class: Delayed::Backend::Mongoid::Job

Inherits:
Object
  • Object
show all
Includes:
Base, Mongoid::Document, Mongoid::Timestamps
Defined in:
lib/delayed/backend/mongoid.rb

Class Method Summary collapse

Class Method Details

.after_forkObject



25
26
27
# File 'lib/delayed/backend/mongoid.rb', line 25

def self.after_fork
  ::Mongoid.master.connection.connect
end

.before_forkObject



21
22
23
# File 'lib/delayed/backend/mongoid.rb', line 21

def self.before_fork
  ::Mongoid.master.connection.close
end

.clear_locks!(worker_name) ⇒ Object

When a worker is exiting, make sure we don’t have any locked jobs.



72
73
74
# File 'lib/delayed/backend/mongoid.rb', line 72

def self.clear_locks!(worker_name)
  self.collection.update({:locked_by => worker_name}, {"$set" => {:locked_at => nil, :locked_by => nil}}, :multi => true)
end

.db_time_nowObject



29
30
31
# File 'lib/delayed/backend/mongoid.rb', line 29

def self.db_time_now
  Time.now.utc
end

.reserve(worker, max_run_time = Worker.max_run_time) ⇒ Object

Reserves this job for the worker.

Uses Mongo’s findAndModify operation to atomically pick and lock one job from from the collection. findAndModify is not yet available directly thru Mongoid so go down to the Mongo Ruby driver instead.



38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
# File 'lib/delayed/backend/mongoid.rb', line 38

def self.reserve(worker, max_run_time = Worker.max_run_time)
  right_now = db_time_now

  conditions = {:run_at  => {"$lte" => right_now}, :failed_at => nil}
  (conditions[:priority] ||= {})['$gte'] = Worker.min_priority.to_i if Worker.min_priority
  (conditions[:priority] ||= {})['$lte'] = Worker.max_priority.to_i if Worker.max_priority

  if ::Mongoid.master.connection.server_version < '1.5.3'
    lock_time = "new Date(#{(right_now - max_run_time).to_f * 1000})"
    conditions['$where'] = "this.locked_by == '#{worker.name}' || this.locked_at == null || this.locked_at < #{lock_time}"
  else
    conditions['$or'] = [
      { :locked_by => worker.name },
      { :locked_at => nil },
      { :locked_at => { '$lt' => (right_now - max_run_time) }}
    ]
  end

  begin
    result = self.db.collection(self.collection.name).find_and_modify(
      :query  => conditions,
      :sort   => [['locked_by', -1], ['priority', 1], ['run_at', 1]],
      :update => {"$set" => {:locked_at => right_now, :locked_by => worker.name}}
    )

    # Return result as a Mongoid document.
    # When Mongoid starts supporting findAndModify, this extra step should no longer be necessary.
    self.find(:first, :conditions => {:_id => result["_id"]})
  rescue Mongo::OperationFailure
    nil # no jobs available
  end
end