Class: Delayed::Backend::MongoMapper::Job
- Inherits:
-
Object
- Object
- Delayed::Backend::MongoMapper::Job
show all
- Includes:
- Base, MongoMapper::Document
- Defined in:
- lib/delayed/backend/mongo_mapper.rb
Constant Summary
Constants included
from Base
Base::ParseObjectFromYaml
Class Method Summary
collapse
Instance Method Summary
collapse
Methods included from Base
#failed?, included, #invoke_job, #name, #payload_object, #payload_object=, #unlock
Class Method Details
.after_fork ⇒ Object
41
42
43
|
# File 'lib/delayed/backend/mongo_mapper.rb', line 41
def self.after_fork
::MongoMapper.connect(RAILS_ENV)
end
|
.before_fork ⇒ Object
37
38
39
|
# File 'lib/delayed/backend/mongo_mapper.rb', line 37
def self.before_fork
::MongoMapper.connection.close
end
|
.clear_locks!(worker_name) ⇒ Object
When a worker is exiting, make sure we don’t have any locked jobs.
70
71
72
|
# File 'lib/delayed/backend/mongo_mapper.rb', line 70
def self.clear_locks!(worker_name)
collection.update({:locked_by => worker_name}, {"$set" => {:locked_at => nil, :locked_by => nil}}, :multi => true)
end
|
.db_time_now ⇒ Object
45
46
47
|
# File 'lib/delayed/backend/mongo_mapper.rb', line 45
def self.db_time_now
Time.now.utc
end
|
.find_available(worker_name, limit = 5, max_run_time = Worker.max_run_time) ⇒ Object
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
|
# File 'lib/delayed/backend/mongo_mapper.rb', line 49
def self.find_available(worker_name, limit = 5, max_run_time = Worker.max_run_time)
right_now = db_time_now
conditions = {
:run_at => {"$lte" => right_now},
:limit => -limit, :failed_at => nil,
:sort => [['priority', 1], ['run_at', 1]]
}
where = "this.locked_at == null || this.locked_at < #{make_date(right_now - max_run_time)}"
(conditions[:priority] ||= {})['$gte'] = Worker.min_priority.to_i if Worker.min_priority
(conditions[:priority] ||= {})['$lte'] = Worker.max_priority.to_i if Worker.max_priority
results = all(conditions.merge(:locked_by => worker_name))
results += all(conditions.merge('$where' => where)) if results.size < limit
results
end
|
Instance Method Details
#lock_exclusively!(max_run_time, worker = worker_name) ⇒ Object
Lock this job for this worker. Returns true if we have the lock, false otherwise.
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
|
# File 'lib/delayed/backend/mongo_mapper.rb', line 76
def lock_exclusively!(max_run_time, worker = worker_name)
right_now = self.class.db_time_now
overtime = right_now - max_run_time.to_i
query = "this.locked_at == null || this.locked_at < #{make_date(overtime)} || this.locked_by == #{worker.to_json}"
conditions = {:_id => id, :run_at => {"$lte" => right_now}, "$where" => query}
collection.update(conditions, {"$set" => {:locked_at => right_now, :locked_by => worker}})
affected_rows = collection.find({:_id => id, :locked_by => worker}).count
if affected_rows == 1
self.locked_at = right_now
self.locked_by = worker
return true
else
return false
end
end
|