Class: Delayed::Backend::DataMapper::Job
- Includes:
- DataMapper::Resource, Base
- Defined in:
- lib/delayed/backend/data_mapper.rb
Constant Summary
Constants included from Base
Class Method Summary collapse
-
.clear_locks!(worker_name) ⇒ Object
When a worker is exiting, make sure we don’t have any locked jobs.
- .db_time_now ⇒ Object
-
.delete_all ⇒ Object
these are common to the other backends, so we provide an implementation.
- .find(id) ⇒ Object
- .find_available(worker_name, limit = 5, max_run_time = Worker.max_run_time) ⇒ Object
Instance Method Summary collapse
-
#lock_exclusively!(max_run_time, worker = worker_name) ⇒ Object
Lock this job for this worker.
- #update_attributes(attributes) ⇒ Object
Methods included from Base
#failed?, included, #invoke_job, #name, #payload_object, #payload_object=, #unlock
Class Method Details
.clear_locks!(worker_name) ⇒ Object
When a worker is exiting, make sure we don’t have any locked jobs.
83 84 85 |
# File 'lib/delayed/backend/data_mapper.rb', line 83 def self.clear_locks!(worker_name) all(:locked_by => worker_name).update(:locked_at => nil, :locked_by => nil) end |
.db_time_now ⇒ Object
58 59 60 |
# File 'lib/delayed/backend/data_mapper.rb', line 58 def self.db_time_now Time.now end |
.delete_all ⇒ Object
these are common to the other backends, so we provide an implementation
120 121 122 |
# File 'lib/delayed/backend/data_mapper.rb', line 120 def self.delete_all Delayed::Job.auto_migrate! end |
.find(id) ⇒ Object
124 125 126 |
# File 'lib/delayed/backend/data_mapper.rb', line 124 def self.find id get id end |
.find_available(worker_name, limit = 5, max_run_time = Worker.max_run_time) ⇒ Object
62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 |
# File 'lib/delayed/backend/data_mapper.rb', line 62 def self.find_available(worker_name, limit = 5, max_run_time = Worker.max_run_time) simple_conditions = { :run_at.lte => db_time_now, :limit => limit, :failed_at => nil, :order => [:priority.asc, :run_at.asc] } # respect priorities simple_conditions[:priority.gte] = Worker.min_priority if Worker.min_priority simple_conditions[:priority.lte] = Worker.max_priority if Worker.max_priority # lockable lockable = ( # not locked or past the max time ( all(:locked_at => nil ) | all(:locked_at.lt => db_time_now - max_run_time)) | # OR locked by our worker all(:locked_by => worker_name)) # plus some other boring junk (lockable).all( simple_conditions ) end |
Instance Method Details
#lock_exclusively!(max_run_time, worker = worker_name) ⇒ Object
Lock this job for this worker. Returns true if we have the lock, false otherwise.
89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 |
# File 'lib/delayed/backend/data_mapper.rb', line 89 def lock_exclusively!(max_run_time, worker = worker_name) now = self.class.db_time_now overtime = now - max_run_time # FIXME - this is a bit gross # DM doesn't give us the number of rows affected by a collection update # so we have to circumvent some niceness in DM::Collection here job_lock_owned_by_worker = locked_by == worker collection = unless job_lock_owned_by_worker job_run_lte_now = self.class.all(:id => id, :run_at.lte => now) all_unlocked = self.class.all(:locked_at => nil) all_locked_before_overtime = self.class.all(:locked_at.lt => overtime) (job_run_lte_now & (all_unlocked | all_locked_before_overtime) ) else all_locked_by_worker = self.class.all(:id => id, :locked_by => worker) all_locked_by_worker end attributes = collection.model.new(:locked_at => now, :locked_by => worker).dirty_attributes affected_rows = self.repository.update(attributes, collection) if affected_rows == 1 self.locked_at = now self.locked_by = worker return true else return false end end |
#update_attributes(attributes) ⇒ Object
128 129 130 131 132 133 |
# File 'lib/delayed/backend/data_mapper.rb', line 128 def update_attributes(attributes) attributes.each do |k,v| self[k] = v end self.save end |