Class: Delayed::Job

Inherits:
ActiveRecord::Base
  • Object
show all
Defined in:
lib/delayed/job.rb

Overview

A job object that is persisted to the database. Contains the work object as a YAML field.

Constant Summary collapse

ParseObjectFromYaml =
/\!ruby\/\w+\:([^\s]+)/
@@max_attempts =
25
@@max_run_time =
4.hours

Class Method Summary collapse

Instance Method Summary collapse

Class Method Details

.clear_locks!(worker_name) ⇒ Object

When a worker is exiting, make sure we don’t have any locked jobs.



36
37
38
# File 'lib/delayed/job.rb', line 36

def self.clear_locks!(worker_name)
  update_all("locked_by = null, locked_at = null", ["locked_by = ?", worker_name])
end

.enqueue(*args, &block) ⇒ Object

Add a job to the queue



110
111
112
113
114
115
116
117
118
119
120
121
# File 'lib/delayed/job.rb', line 110

def self.enqueue(*args, &block)
  object = block_given? ? EvaledJob.new(&block) : args.shift

  unless object.respond_to?(:perform) || block_given?
    raise ArgumentError, 'Cannot enqueue items which do not respond to perform'
  end

  priority = args.first || 0
  run_at   = args[1]

  Job.create(:payload_object => object, :priority => priority.to_i, :run_at => run_at)
end

.find_available(worker_name, limit = 5, max_run_time = max_run_time) ⇒ Object

Find a few candidate jobs to run (in case some immediately get locked by others).



124
125
126
127
128
129
130
131
132
# File 'lib/delayed/job.rb', line 124

def self.find_available(worker_name, limit = 5, max_run_time = max_run_time)
  scope = self.ready_to_run(worker_name, max_run_time)
  scope = scope.scoped(:conditions => ['priority >= ?', min_priority]) if min_priority
  scope = scope.scoped(:conditions => ['priority <= ?', max_priority]) if max_priority

  ActiveRecord::Base.silence do
    scope.by_priority.all(:limit => limit)
  end
end

Instance Method Details

#failed?Boolean Also known as: failed

Returns:

  • (Boolean)


40
41
42
# File 'lib/delayed/job.rb', line 40

def failed?
  failed_at
end

#invoke_jobObject

Moved into its own method so that new_relic can trace it.



168
169
170
# File 'lib/delayed/job.rb', line 168

def invoke_job
  payload_object.perform
end

#lock_exclusively!(max_run_time, worker) ⇒ Object

Lock this job for this worker. Returns true if we have the lock, false otherwise.



136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
# File 'lib/delayed/job.rb', line 136

def lock_exclusively!(max_run_time, worker)
  now = self.class.db_time_now
  affected_rows = if locked_by != worker
    # We don't own this job so we will update the locked_by name and the locked_at
    self.class.update_all(["locked_at = ?, locked_by = ?", now, worker], ["id = ? and (locked_at is null or locked_at < ?) and (run_at <= ?)", id, (now - max_run_time.to_i), now])
  else
    # We already own this job, this may happen if the job queue crashes.
    # Simply resume and update the locked_at
    self.class.update_all(["locked_at = ?", now], ["id = ? and locked_by = ?", id, worker])
  end
  if affected_rows == 1
    self.locked_at    = now
    self.locked_by    = worker
    return true
  else
    return false
  end
end

#log_exception(error) ⇒ Object

This is a good hook if you need to report job processing errors in additional or different ways



162
163
164
165
# File 'lib/delayed/job.rb', line 162

def log_exception(error)
  logger.error "* [JOB] #{name} failed with #{error.class.name}: #{error.message} - #{attempts} failed attempts"
  logger.error(error)
end

#nameObject



49
50
51
52
53
54
55
56
57
58
# File 'lib/delayed/job.rb', line 49

def name
  @name ||= begin
    payload = payload_object
    if payload.respond_to?(:display_name)
      payload.display_name
    else
      payload.class.name
    end
  end
end

#payload_objectObject



45
46
47
# File 'lib/delayed/job.rb', line 45

def payload_object
  @payload_object ||= deserialize(self['handler'])
end

#payload_object=(object) ⇒ Object



60
61
62
# File 'lib/delayed/job.rb', line 60

def payload_object=(object)
  self['handler'] = object.to_yaml
end

#reschedule(message, backtrace = [], time = nil) ⇒ Object

Reschedule the job in the future (when a job fails). Uses an exponential scale depending on the number of failed attempts.



66
67
68
69
70
71
72
73
74
75
76
77
78
79
# File 'lib/delayed/job.rb', line 66

def reschedule(message, backtrace = [], time = nil)
  self.last_error   = message + "\n" + backtrace.join("\n")

  if (self.attempts += 1) < max_attempts
    time ||= Job.db_time_now + (attempts ** 4) + 5

    self.run_at       = time
    self.unlock
    save!
  else
    logger.info "* [JOB] PERMANENTLY removing #{self.name} because of #{attempts} consecutive failures."
    destroy_failed_jobs ? destroy : update_attribute(:failed_at, Delayed::Job.db_time_now)
  end
end

#run(max_run_time) ⇒ Object

Try to run job. Returns true/false (work done/work failed)



95
96
97
98
99
100
101
102
103
104
105
106
107
# File 'lib/delayed/job.rb', line 95

def run(max_run_time)
  runtime =  Benchmark.realtime do
    Timeout.timeout(max_run_time.to_i) { invoke_job }
    destroy
  end
  # TODO: warn if runtime > max_run_time ?
  logger.info "* [JOB] #{name} completed after %.4f" % runtime
  return true  # did work
rescue Exception => e
  reschedule e.message, e.backtrace
  log_exception(e)
  return false  # work failed
end

#run_with_lock(max_run_time, worker_name) ⇒ Object

Try to lock and run job. Returns true/false (work done/work failed) or nil if job can’t be locked.



83
84
85
86
87
88
89
90
91
92
# File 'lib/delayed/job.rb', line 83

def run_with_lock(max_run_time, worker_name)
  logger.info "* [JOB] acquiring lock on #{name}"
  if lock_exclusively!(max_run_time, worker_name)
    run(max_run_time)
  else
    # We did not get the lock, some other worker process must have
    logger.warn "* [JOB] failed to acquire exclusive lock for #{name}"
    nil # no work done
  end
end

#unlockObject

Unlock this job (note: not saved to DB)



156
157
158
159
# File 'lib/delayed/job.rb', line 156

def unlock
  self.locked_at    = nil
  self.locked_by    = nil
end