Class: Delayed::Worker
- Inherits:
-
Object
- Object
- Delayed::Worker
- Defined in:
- lib/delayed/worker.rb
Instance Attribute Summary collapse
-
#name_prefix ⇒ Object
name_prefix is ignored if name is set directly.
-
#queues ⇒ Object
Returns the value of attribute queues.
Class Method Summary collapse
Instance Method Summary collapse
- #failed(job) ⇒ Object
-
#initialize(options = {}) ⇒ Worker
constructor
A new instance of Worker.
- #max_attempts(job) ⇒ Object
-
#name ⇒ Object
Every worker has a unique name which by default is the pid of the process.
-
#name=(val) ⇒ Object
Sets the name of the worker.
-
#reschedule(job, time = nil) ⇒ Object
Reschedule the job in the future (when a job fails).
- #run(job) ⇒ Object
- #say(text, level = Logger::INFO) ⇒ Object
- #start ⇒ Object
-
#work_off(num = 100) ⇒ Object
Do num jobs and return stats on success/failure.
Constructor Details
#initialize(options = {}) ⇒ Worker
Returns a new instance of Worker.
51 52 53 54 55 56 57 |
# File 'lib/delayed/worker.rb', line 51 def initialize(={}) @quiet = [:quiet] self.class.min_priority = [:min_priority] if .has_key?(:min_priority) self.class.max_priority = [:max_priority] if .has_key?(:max_priority) self.class.sleep_delay = [:sleep_delay] if .has_key?(:sleep_delay) self.queues = [:queues] || ["default"] end |
Instance Attribute Details
#name_prefix ⇒ Object
name_prefix is ignored if name is set directly
27 28 29 |
# File 'lib/delayed/worker.rb', line 27 def name_prefix @name_prefix end |
#queues ⇒ Object
Returns the value of attribute queues.
8 9 10 |
# File 'lib/delayed/worker.rb', line 8 def queues @queues end |
Class Method Details
.backend=(backend) ⇒ Object
31 32 33 34 35 36 37 38 |
# File 'lib/delayed/worker.rb', line 31 def self.backend=(backend) if backend.is_a? Symbol require "delayed/backend/#{backend}" backend = "Delayed::Backend::#{backend.to_s.classify}::Job".constantize end @@backend = backend silence_warnings { ::Delayed.const_set(:Job, backend) } end |
.guess_backend ⇒ Object
40 41 42 43 44 45 46 47 48 49 |
# File 'lib/delayed/worker.rb', line 40 def self.guess_backend self.backend ||= if defined?(ActiveRecord) :active_record elsif defined?(MongoMapper) :mongo_mapper else logger.warn "Could not decide on a backend, defaulting to active_record" :active_record end end |
Instance Method Details
#failed(job) ⇒ Object
152 153 154 155 156 157 158 159 160 161 162 163 |
# File 'lib/delayed/worker.rb', line 152 def failed(job) begin if job.payload_object.respond_to? :on_permanent_failure say "Running on_permanent_failure hook" job.payload_object.on_permanent_failure end rescue DeserializationError # do nothing end self.class.destroy_failed_jobs ? job.destroy : job.update_attributes(:failed_at => Delayed::Job.db_time_now) end |
#max_attempts(job) ⇒ Object
171 172 173 |
# File 'lib/delayed/worker.rb', line 171 def max_attempts(job) job.max_attempts || self.class.max_attempts end |
#name ⇒ Object
Every worker has a unique name which by default is the pid of the process. There are some advantages to overriding this with something which survives worker retarts: Workers can# safely resume working on tasks which are locked by themselves. The worker will assume that it crashed before.
63 64 65 66 |
# File 'lib/delayed/worker.rb', line 63 def name return @name unless @name.nil? "#{@name_prefix}host:#{Socket.gethostname} pid:#{Process.pid}" rescue "#{@name_prefix}pid:#{Process.pid}" end |
#name=(val) ⇒ Object
Sets the name of the worker. Setting the name to nil will reset the default worker name
70 71 72 |
# File 'lib/delayed/worker.rb', line 70 def name=(val) @name = val end |
#reschedule(job, time = nil) ⇒ Object
Reschedule the job in the future (when a job fails). Uses an exponential scale depending on the number of failed attempts.
141 142 143 144 145 146 147 148 149 150 |
# File 'lib/delayed/worker.rb', line 141 def reschedule(job, time = nil) if (job.attempts += 1) < max_attempts(job) job.run_at = time || job.reschedule_at job.unlock job.save! else say "PERMANENTLY removing #{job.name} because of #{job.attempts} consecutive failures.", Logger::INFO failed(job) end end |
#run(job) ⇒ Object
124 125 126 127 128 129 130 131 132 133 134 135 136 137 |
# File 'lib/delayed/worker.rb', line 124 def run(job) runtime = Benchmark.realtime do Timeout.timeout(self.class.max_run_time.to_i) { job.invoke_job } job.destroy end say "#{job.name} completed after %.4f" % runtime return true # did work rescue DeserializationError => error job.last_error = "{#{error.}\n#{error.backtrace.join('\n')}" failed(job) rescue Exception => error handle_failed_job(job, error) return false # work failed end |
#say(text, level = Logger::INFO) ⇒ Object
165 166 167 168 169 |
# File 'lib/delayed/worker.rb', line 165 def say(text, level = Logger::INFO) text = "[Worker(#{name})] #{text}" puts text unless @quiet logger.add level, "#{Time.now.strftime('%FT%T%z')}: #{text}" if logger end |
#start ⇒ Object
74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 |
# File 'lib/delayed/worker.rb', line 74 def start say "Starting job worker" trap('TERM') { say 'Exiting...'; $exit = true } trap('INT') { say 'Exiting...'; $exit = true } loop do result = nil realtime = Benchmark.realtime do result = work_off end count = result.sum break if $exit if count.zero? sleep(self.class.sleep_delay) else say "#{count} jobs processed at %.4f j/s, %d failed ..." % [count / realtime, result.last] end break if $exit end ensure Delayed::Job.clear_locks!(name) end |
#work_off(num = 100) ⇒ Object
Do num jobs and return stats on success/failure. Exit early if interrupted.
106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 |
# File 'lib/delayed/worker.rb', line 106 def work_off(num = 100) success, failure = 0, 0 num.times do case reserve_and_run_one_job when true success += 1 when false failure += 1 else break # leave if no work could be done end break if $exit # leave if we're exiting end return [success, failure] end |