Class: Delayed::Job
- Inherits:
-
ActiveRecord::Base
- Object
- ActiveRecord::Base
- Delayed::Job
- Defined in:
- lib/delayed/job.rb
Overview
A job object that is persisted to the database. Contains the work object as a YAML field.
Constant Summary collapse
- ParseObjectFromYaml =
/\!ruby\/\w+\:([^\s]+)/
- @@max_attempts =
25
- @@max_run_time =
4.hours
Class Method Summary collapse
-
.clear_locks!(worker_name) ⇒ Object
When a worker is exiting, make sure we don’t have any locked jobs.
-
.enqueue(*args, &block) ⇒ Object
Add a job to the queue.
-
.find_available(worker_name, limit = 5, max_run_time = max_run_time) ⇒ Object
Find a few candidate jobs to run (in case some immediately get locked by others).
Instance Method Summary collapse
- #failed? ⇒ Boolean (also: #failed)
-
#invoke_job ⇒ Object
Moved into its own method so that new_relic can trace it.
-
#lock_exclusively!(max_run_time, worker) ⇒ Object
Lock this job for this worker.
-
#log_exception(error) ⇒ Object
This is a good hook if you need to report job processing errors in additional or different ways.
- #name ⇒ Object
- #payload_object ⇒ Object
- #payload_object=(object) ⇒ Object
-
#reschedule(message, backtrace = [], time = nil) ⇒ Object
Reschedule the job in the future (when a job fails).
-
#run(max_run_time) ⇒ Object
Try to run job.
-
#run_with_lock(max_run_time, worker_name) ⇒ Object
Try to lock and run job.
-
#unlock ⇒ Object
Unlock this job (note: not saved to DB).
Class Method Details
.clear_locks!(worker_name) ⇒ Object
When a worker is exiting, make sure we don’t have any locked jobs.
36 37 38 |
# File 'lib/delayed/job.rb', line 36 def self.clear_locks!(worker_name) update_all("locked_by = null, locked_at = null", ["locked_by = ?", worker_name]) end |
.enqueue(*args, &block) ⇒ Object
Add a job to the queue
110 111 112 113 114 115 116 117 118 119 120 121 |
# File 'lib/delayed/job.rb', line 110 def self.enqueue(*args, &block) object = block_given? ? EvaledJob.new(&block) : args.shift unless object.respond_to?(:perform) || block_given? raise ArgumentError, 'Cannot enqueue items which do not respond to perform' end priority = args.first || 0 run_at = args[1] Job.create(:payload_object => object, :priority => priority.to_i, :run_at => run_at) end |
.find_available(worker_name, limit = 5, max_run_time = max_run_time) ⇒ Object
Find a few candidate jobs to run (in case some immediately get locked by others).
124 125 126 127 128 129 130 131 132 |
# File 'lib/delayed/job.rb', line 124 def self.find_available(worker_name, limit = 5, max_run_time = max_run_time) scope = self.ready_to_run(worker_name, max_run_time) scope = scope.scoped(:conditions => ['priority >= ?', min_priority]) if min_priority scope = scope.scoped(:conditions => ['priority <= ?', max_priority]) if max_priority ActiveRecord::Base.silence do scope.by_priority.all(:limit => limit) end end |
Instance Method Details
#failed? ⇒ Boolean Also known as: failed
40 41 42 |
# File 'lib/delayed/job.rb', line 40 def failed? failed_at end |
#invoke_job ⇒ Object
Moved into its own method so that new_relic can trace it.
168 169 170 |
# File 'lib/delayed/job.rb', line 168 def invoke_job payload_object.perform end |
#lock_exclusively!(max_run_time, worker) ⇒ Object
Lock this job for this worker. Returns true if we have the lock, false otherwise.
136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 |
# File 'lib/delayed/job.rb', line 136 def lock_exclusively!(max_run_time, worker) now = self.class.db_time_now affected_rows = if locked_by != worker # We don't own this job so we will update the locked_by name and the locked_at self.class.update_all(["locked_at = ?, locked_by = ?", now, worker], ["id = ? and (locked_at is null or locked_at < ?) and (run_at <= ?)", id, (now - max_run_time.to_i), now]) else # We already own this job, this may happen if the job queue crashes. # Simply resume and update the locked_at self.class.update_all(["locked_at = ?", now], ["id = ? and locked_by = ?", id, worker]) end if affected_rows == 1 self.locked_at = now self.locked_by = worker return true else return false end end |
#log_exception(error) ⇒ Object
This is a good hook if you need to report job processing errors in additional or different ways
162 163 164 165 |
# File 'lib/delayed/job.rb', line 162 def log_exception(error) logger.error "* [JOB] #{name} failed with #{error.class.name}: #{error.} - #{attempts} failed attempts" logger.error(error) end |
#name ⇒ Object
49 50 51 52 53 54 55 56 57 58 |
# File 'lib/delayed/job.rb', line 49 def name @name ||= begin payload = payload_object if payload.respond_to?(:display_name) payload.display_name else payload.class.name end end end |
#payload_object ⇒ Object
45 46 47 |
# File 'lib/delayed/job.rb', line 45 def payload_object @payload_object ||= deserialize(self['handler']) end |
#payload_object=(object) ⇒ Object
60 61 62 |
# File 'lib/delayed/job.rb', line 60 def payload_object=(object) self['handler'] = object.to_yaml end |
#reschedule(message, backtrace = [], time = nil) ⇒ Object
Reschedule the job in the future (when a job fails). Uses an exponential scale depending on the number of failed attempts.
66 67 68 69 70 71 72 73 74 75 76 77 78 79 |
# File 'lib/delayed/job.rb', line 66 def reschedule(, backtrace = [], time = nil) self.last_error = + "\n" + backtrace.join("\n") if (self.attempts += 1) < max_attempts time ||= Job.db_time_now + (attempts ** 4) + 5 self.run_at = time self.unlock save! else logger.info "* [JOB] PERMANENTLY removing #{self.name} because of #{attempts} consecutive failures." destroy_failed_jobs ? destroy : update_attribute(:failed_at, Delayed::Job.db_time_now) end end |
#run(max_run_time) ⇒ Object
Try to run job. Returns true/false (work done/work failed)
95 96 97 98 99 100 101 102 103 104 105 106 107 |
# File 'lib/delayed/job.rb', line 95 def run(max_run_time) runtime = Benchmark.realtime do Timeout.timeout(max_run_time.to_i) { invoke_job } destroy end # TODO: warn if runtime > max_run_time ? logger.info "* [JOB] #{name} completed after %.4f" % runtime return true # did work rescue Exception => e reschedule e., e.backtrace log_exception(e) return false # work failed end |
#run_with_lock(max_run_time, worker_name) ⇒ Object
Try to lock and run job. Returns true/false (work done/work failed) or nil if job can’t be locked.
83 84 85 86 87 88 89 90 91 92 |
# File 'lib/delayed/job.rb', line 83 def run_with_lock(max_run_time, worker_name) logger.info "* [JOB] acquiring lock on #{name}" if lock_exclusively!(max_run_time, worker_name) run(max_run_time) else # We did not get the lock, some other worker process must have logger.warn "* [JOB] failed to acquire exclusive lock for #{name}" nil # no work done end end |
#unlock ⇒ Object
Unlock this job (note: not saved to DB)
156 157 158 159 |
# File 'lib/delayed/job.rb', line 156 def unlock self.locked_at = nil self.locked_by = nil end |