Class: Delayed::Backend::MongoMapper::Job

Inherits:
Object
  • Object
show all
Includes:
Base, MongoMapper::Document
Defined in:
lib/delayed/backend/mongo_mapper.rb

Constant Summary

Constants included from Base

Base::ParseObjectFromYaml

Class Method Summary collapse

Instance Method Summary collapse

Methods included from Base

#failed?, included, #invoke_job, #max_attempts, #name, #payload_object, #payload_object=, #reschedule_at, #unlock

Class Method Details

.after_forkObject

[View source]

45
46
47
# File 'lib/delayed/backend/mongo_mapper.rb', line 45

def self.after_fork
  ::MongoMapper.connect(RAILS_ENV)
end

.before_forkObject

[View source]

41
42
43
# File 'lib/delayed/backend/mongo_mapper.rb', line 41

def self.before_fork
  ::MongoMapper.connection.close
end

.clear_locks!(worker_name) ⇒ Object

When a worker is exiting, make sure we don’t have any locked jobs.

[View source]

74
75
76
# File 'lib/delayed/backend/mongo_mapper.rb', line 74

def self.clear_locks!(worker_name)
  collection.update({:locked_by => worker_name}, {"$set" => {:locked_at => nil, :locked_by => nil}}, :multi => true)
end

.db_time_nowObject

[View source]

49
50
51
# File 'lib/delayed/backend/mongo_mapper.rb', line 49

def self.db_time_now
  Time.now.utc
end

.find_available(worker_name, limit = 5, max_run_time = Worker.max_run_time) ⇒ Object

[View source]

53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
# File 'lib/delayed/backend/mongo_mapper.rb', line 53

def self.find_available(worker_name, limit = 5, max_run_time = Worker.max_run_time)
  right_now = db_time_now

  conditions = {
    :run_at => {"$lte" => right_now},
    :limit => -limit, # In mongo, positive limits are 'soft' and negative are 'hard'
    :failed_at => nil,
    :sort => [['priority', 1], ['run_at', 1]]
  }

  where = "this.locked_at == null || this.locked_at < #{make_date(right_now - max_run_time)}"
  
  (conditions[:priority] ||= {})['$gte'] = Worker.min_priority.to_i if Worker.min_priority
  (conditions[:priority] ||= {})['$lte'] = Worker.max_priority.to_i if Worker.max_priority

  results = all(conditions.merge(:locked_by => worker_name))
  results += all(conditions.merge('$where' => where)) if results.size < limit
  results
end

Instance Method Details

#lock_exclusively!(max_run_time, worker = worker_name) ⇒ Object

Lock this job for this worker. Returns true if we have the lock, false otherwise.

[View source]

80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
# File 'lib/delayed/backend/mongo_mapper.rb', line 80

def lock_exclusively!(max_run_time, worker = worker_name)
  right_now = self.class.db_time_now
  overtime = right_now - max_run_time.to_i
  
  query = "this.locked_at == null || this.locked_at < #{make_date(overtime)} || this.locked_by == #{worker.to_json}"
  conditions = {:_id => id, :run_at => {"$lte" => right_now}, "$where" => query}

  collection.update(conditions, {"$set" => {:locked_at => right_now, :locked_by => worker}})
  affected_rows = collection.find({:_id => id, :locked_by => worker}).count
  if affected_rows == 1
    self.locked_at = right_now
    self.locked_by = worker
    return true
  else
    return false
  end
end