Class: Delayed::Backend::MongoMapper::Job
Constant Summary
Constants included
from Base
Base::ParseObjectFromYaml
Class Method Summary
collapse
Instance Method Summary
collapse
Methods included from Base
#failed?, included, #invoke_job, #max_attempts, #name, #payload_object, #payload_object=, #reschedule_at, #unlock
Class Method Details
permalink
.after_fork ⇒ Object
[View source]
45
46
47
|
# File 'lib/delayed/backend/mongo_mapper.rb', line 45
def self.after_fork
::MongoMapper.connect(RAILS_ENV)
end
|
permalink
.before_fork ⇒ Object
[View source]
41
42
43
|
# File 'lib/delayed/backend/mongo_mapper.rb', line 41
def self.before_fork
::MongoMapper.connection.close
end
|
permalink
.clear_locks!(worker_name) ⇒ Object
When a worker is exiting, make sure we don’t have any locked jobs.
[View source]
74
75
76
|
# File 'lib/delayed/backend/mongo_mapper.rb', line 74
def self.clear_locks!(worker_name)
collection.update({:locked_by => worker_name}, {"$set" => {:locked_at => nil, :locked_by => nil}}, :multi => true)
end
|
permalink
.db_time_now ⇒ Object
[View source]
49
50
51
|
# File 'lib/delayed/backend/mongo_mapper.rb', line 49
def self.db_time_now
Time.now.utc
end
|
permalink
.find_available(worker_name, limit = 5, max_run_time = Worker.max_run_time) ⇒ Object
[View source]
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
|
# File 'lib/delayed/backend/mongo_mapper.rb', line 53
def self.find_available(worker_name, limit = 5, max_run_time = Worker.max_run_time)
right_now = db_time_now
conditions = {
:run_at => {"$lte" => right_now},
:limit => -limit, :failed_at => nil,
:sort => [['priority', 1], ['run_at', 1]]
}
where = "this.locked_at == null || this.locked_at < #{make_date(right_now - max_run_time)}"
(conditions[:priority] ||= {})['$gte'] = Worker.min_priority.to_i if Worker.min_priority
(conditions[:priority] ||= {})['$lte'] = Worker.max_priority.to_i if Worker.max_priority
results = all(conditions.merge(:locked_by => worker_name))
results += all(conditions.merge('$where' => where)) if results.size < limit
results
end
|
Instance Method Details
permalink
#lock_exclusively!(max_run_time, worker = worker_name) ⇒ Object
Lock this job for this worker. Returns true if we have the lock, false otherwise.
[View source]
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
|
# File 'lib/delayed/backend/mongo_mapper.rb', line 80
def lock_exclusively!(max_run_time, worker = worker_name)
right_now = self.class.db_time_now
overtime = right_now - max_run_time.to_i
query = "this.locked_at == null || this.locked_at < #{make_date(overtime)} || this.locked_by == #{worker.to_json}"
conditions = {:_id => id, :run_at => {"$lte" => right_now}, "$where" => query}
collection.update(conditions, {"$set" => {:locked_at => right_now, :locked_by => worker}})
affected_rows = collection.find({:_id => id, :locked_by => worker}).count
if affected_rows == 1
self.locked_at = right_now
self.locked_by = worker
return true
else
return false
end
end
|