Class: Delayed::Backend::ActiveRecord::Job
- Inherits:
-
ActiveRecord::Base
- Object
- ActiveRecord::Base
- Delayed::Backend::ActiveRecord::Job
- Includes:
- Base
- Defined in:
- lib/delayed/backend/active_record.rb
Overview
A job object that is persisted to the database. Contains the work object as a YAML field.
Constant Summary
Constants included from Base
Class Method Summary collapse
- .after_fork ⇒ Object
-
.clear_locks!(worker_name) ⇒ Object
When a worker is exiting, make sure we don’t have any locked jobs.
-
.db_time_now ⇒ Object
Get the current time (GMT or local depending on DB) Note: This does not ping the DB to get the time, so all your clients must have syncronized clocks.
-
.find_available(worker_name, limit = 5, max_run_time = Worker.max_run_time, queue_names = ["default"]) ⇒ Object
Find a few candidate jobs to run (in case some immediately get locked by others).
Instance Method Summary collapse
-
#lock_exclusively!(max_run_time, worker) ⇒ Object
Lock this job for this worker.
Methods included from Base
#failed?, included, #invoke_job, #max_attempts, #name, #payload_object, #payload_object=, #reschedule_at, #unlock
Class Method Details
.after_fork ⇒ Object
38 39 40 |
# File 'lib/delayed/backend/active_record.rb', line 38 def self.after_fork ::ActiveRecord::Base.connection.reconnect! end |
.clear_locks!(worker_name) ⇒ Object
When a worker is exiting, make sure we don’t have any locked jobs.
43 44 45 |
# File 'lib/delayed/backend/active_record.rb', line 43 def self.clear_locks!(worker_name) update_all("locked_by = null, locked_at = null", ["locked_by = ?", worker_name]) end |
.db_time_now ⇒ Object
Get the current time (GMT or local depending on DB) Note: This does not ping the DB to get the time, so all your clients must have syncronized clocks.
84 85 86 87 88 89 90 91 92 |
# File 'lib/delayed/backend/active_record.rb', line 84 def self.db_time_now if Time.zone Time.zone.now elsif ::ActiveRecord::Base.default_timezone == :utc Time.now.utc else Time.now end end |
.find_available(worker_name, limit = 5, max_run_time = Worker.max_run_time, queue_names = ["default"]) ⇒ Object
Find a few candidate jobs to run (in case some immediately get locked by others).
48 49 50 51 52 53 54 55 56 |
# File 'lib/delayed/backend/active_record.rb', line 48 def self.find_available( worker_name, limit = 5, max_run_time = Worker.max_run_time, queue_names=["default"]) scope = self.ready_to_run(queue_names, worker_name, max_run_time) scope = scope.scoped(:conditions => ['priority >= ?', Worker.min_priority]) if Worker.min_priority scope = scope.scoped(:conditions => ['priority <= ?', Worker.max_priority]) if Worker.max_priority ::ActiveRecord::Base.silence do scope.by_priority.all(:limit => limit) end end |
Instance Method Details
#lock_exclusively!(max_run_time, worker) ⇒ Object
Lock this job for this worker. Returns true if we have the lock, false otherwise.
60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 |
# File 'lib/delayed/backend/active_record.rb', line 60 def lock_exclusively!(max_run_time, worker) now = self.class.db_time_now affected_rows = if locked_by != worker # We don't own this job so we will update the locked_by name and the locked_at self.class.update_all(["locked_at = ?, locked_by = ?", now, worker], ["id = ? and (locked_at is null or locked_at < ?) and (run_at <= ?)", id, (now - max_run_time.to_i), now]) else # We already own this job, this may happen if the job queue crashes. # Simply resume and update the locked_at self.class.update_all(["locked_at = ?", now], ["id = ? and locked_by = ?", id, worker]) end if affected_rows == 1 self.locked_at = now self.locked_by = worker self.locked_at_will_change! self.locked_by_will_change! return true else return false end end |