Class: Delayed::Worker

Inherits:
Object
  • Object
show all
Defined in:
lib/delayed/worker.rb

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(options = {}) ⇒ Worker

Returns a new instance of Worker.



71
72
73
74
75
76
77
78
# File 'lib/delayed/worker.rb', line 71

def initialize(options={})
  @quiet = options[:quiet]
  self.class.min_priority = options[:min_priority] if options.has_key?(:min_priority)
  self.class.max_priority = options[:max_priority] if options.has_key?(:max_priority)
  self.class.queue        = options[:queue]  || Delayed::DEFAULT_QUEUE
  # renaming the log file works like this only when using RAILS_DEFAULT_LOGGER:
  rename_default_rails_log_if_given(options[:logname])
end

Instance Attribute Details

#name_prefixObject

name_prefix is ignored if name is set directly



30
31
32
# File 'lib/delayed/worker.rb', line 30

def name_prefix
  @name_prefix
end

Class Method Details

.backend=(backend) ⇒ Object



51
52
53
54
55
56
57
58
# File 'lib/delayed/worker.rb', line 51

def self.backend=(backend)
  if backend.is_a? Symbol
    require "delayed/backend/#{backend}"
    backend = "Delayed::Backend::#{backend.to_s.classify}::Job".constantize
  end
  @@backend = backend
  silence_warnings { ::Delayed.const_set(:Job, backend) }
end

.guess_backendObject



60
61
62
63
64
65
66
67
68
69
# File 'lib/delayed/worker.rb', line 60

def self.guess_backend
  self.backend ||= if defined?(ActiveRecord)
    :active_record
  elsif defined?(MongoMapper)
    :mongo_mapper
  else
    logger.warn "Could not decide on a backend, defaulting to active_record"
    :active_record
  end
end

Instance Method Details

#nameObject

Every worker has a unique name which by default is the pid of the process. There are some advantages to overriding this with something which survives worker retarts: Workers can# safely resume working on tasks which are locked by themselves. The worker will assume that it crashed before.



84
85
86
87
# File 'lib/delayed/worker.rb', line 84

def name
  return @name unless @name.nil?
  "#{@name_prefix}host:#{Socket.gethostname} pid:#{Process.pid}" rescue "#{@name_prefix}pid:#{Process.pid}"
end

#name=(val) ⇒ Object

Sets the name of the worker. Setting the name to nil will reset the default worker name



91
92
93
# File 'lib/delayed/worker.rb', line 91

def name=(val)
  @name = val
end

#rename_default_rails_log_if_given(filename) ⇒ Object

rename the default Rails logger file, if requested TODO should we put it inside a Rails own class/module? TODO flush in any case, even if not renaming?

stackoverflow.com/questions/3500200/getting-delayed-job-to-log gist.github.com/833828



41
42
43
44
45
46
47
48
49
# File 'lib/delayed/worker.rb', line 41

def rename_default_rails_log_if_given(filename)
  return unless filename and not filename.empty?

  f = open filename, (File::WRONLY | File::APPEND | File::CREAT)
  f.sync = true
  RAILS_DEFAULT_LOGGER.auto_flushing = true
  # TODO shouldn't we first close whatever was there?
  RAILS_DEFAULT_LOGGER.instance_variable_set(:@log, f)
end

#reschedule(job, time = nil) ⇒ Object

Reschedule the job in the future (when a job fails). Uses an exponential scale depending on the number of failed attempts.



159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
# File 'lib/delayed/worker.rb', line 159

def reschedule(job, time = nil)
  if (job.attempts += 1) < self.class.max_attempts
    time ||= Job.db_time_now + (job.attempts ** 4) + 5
    job.run_at = time
    job.unlock
    job.save!
  else
    say "PERMANENTLY removing #{job.name} because of #{job.attempts} consecutive failures.", Logger::INFO

    if job.payload_object.respond_to? :on_permanent_failure
      say "Running on_permanent_failure hook"
      failure_method = job.payload_object.method(:on_permanent_failure)
      if failure_method.arity == 1
        failure_method.call(job)
      else
        failure_method.call
      end
    end

    self.class.destroy_failed_jobs ? job.destroy : job.update_attributes(:failed_at => Delayed::Job.db_time_now)
  end
end

#run(job) ⇒ Object



145
146
147
148
149
150
151
152
153
154
155
# File 'lib/delayed/worker.rb', line 145

def run(job)
  runtime =  Benchmark.realtime do
    Timeout.timeout(self.class.max_run_time.to_i) { job.invoke_job }
    job.destroy
  end
  say "#{job.name} completed after %.4f" % runtime
  return true  # did work
rescue Exception => e
  handle_failed_job(job, e)
  return false  # work failed
end

#say(text, level = Logger::INFO) ⇒ Object



182
183
184
185
186
# File 'lib/delayed/worker.rb', line 182

def say(text, level = Logger::INFO)
  text = "[Worker(#{name})] #{text}"
  puts text unless @quiet
  logger.add level, "#{Time.now.strftime('%FT%T%z')}: #{text}" if logger
end

#startObject



95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
# File 'lib/delayed/worker.rb', line 95

def start
  say "Starting job worker"

  trap('TERM') { say 'Exiting...'; $exit = true }
  trap('INT')  { say 'Exiting...'; $exit = true }

  loop do
    result = nil

    realtime = Benchmark.realtime do
      result = work_off
    end

    count = result.sum

    break if $exit

    if count.zero?
      sleep(@@sleep_delay)
    else
      say "#{count} jobs processed at %.4f j/s, %d failed ..." % [count / realtime, result.last]
    end

    break if $exit
  end

ensure
  Delayed::Job.clear_locks!(name)
end

#work_off(num = 100) ⇒ Object

Do num jobs and return stats on success/failure. Exit early if interrupted.



127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
# File 'lib/delayed/worker.rb', line 127

def work_off(num = 100)
  success, failure = 0, 0

  num.times do
    case reserve_and_run_one_job
    when true
        success += 1
    when false
        failure += 1
    else
      break  # leave if no work could be done
    end
    break if $exit # leave if we're exiting
  end

  return [success, failure]
end