Class: Delayed::Worker

Inherits:
Object
  • Object
show all
Defined in:
lib/delayed/worker.rb

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(options = {}) ⇒ Worker

Returns a new instance of Worker.



47
48
49
50
51
# File 'lib/delayed/worker.rb', line 47

def initialize(options={})
  @quiet = options[:quiet]
  self.class.min_priority = options[:min_priority] if options.has_key?(:min_priority)
  self.class.max_priority = options[:max_priority] if options.has_key?(:max_priority)
end

Instance Attribute Details

#name_prefixObject

name_prefix is ignored if name is set directly



23
24
25
# File 'lib/delayed/worker.rb', line 23

def name_prefix
  @name_prefix
end

Class Method Details

.backend=(backend) ⇒ Object



27
28
29
30
31
32
33
34
# File 'lib/delayed/worker.rb', line 27

def self.backend=(backend)
  if backend.is_a? Symbol
    require "delayed/backend/#{backend}"
    backend = "Delayed::Backend::#{backend.to_s.classify}::Job".constantize
  end
  @@backend = backend
  silence_warnings { ::Delayed.const_set(:Job, backend) }
end

.guess_backendObject



36
37
38
39
40
41
42
43
44
45
# File 'lib/delayed/worker.rb', line 36

def self.guess_backend
  self.backend ||= if defined?(ActiveRecord)
    :active_record
  elsif defined?(MongoMapper)
    :mongo_mapper
  else
    logger.warn "Could not decide on a backend, defaulting to active_record"
    :active_record
  end
end

Instance Method Details

#nameObject

Every worker has a unique name which by default is the pid of the process. There are some advantages to overriding this with something which survives worker retarts: Workers can# safely resume working on tasks which are locked by themselves. The worker will assume that it crashed before.



57
58
59
60
# File 'lib/delayed/worker.rb', line 57

def name
  return @name unless @name.nil?
  "#{@name_prefix}host:#{Socket.gethostname} pid:#{Process.pid}" rescue "#{@name_prefix}pid:#{Process.pid}"
end

#name=(val) ⇒ Object

Sets the name of the worker. Setting the name to nil will reset the default worker name



64
65
66
# File 'lib/delayed/worker.rb', line 64

def name=(val)
  @name = val
end

#reschedule(job, time = nil) ⇒ Object

Reschedule the job in the future (when a job fails). Uses an exponential scale depending on the number of failed attempts.



133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
# File 'lib/delayed/worker.rb', line 133

def reschedule(job, time = nil)
  if (job.attempts += 1) < self.class.max_attempts
    time ||= Job.db_time_now + (job.attempts ** 4) + 5
    job.run_at = time
    job.unlock
    job.save!
  else
    say "* [JOB] PERMANENTLY removing #{job.name} because of #{job.attempts} consecutive failures.", Logger::INFO

    if job.payload_object.respond_to? :on_permanent_failure
      say "* [JOB] Running on_permanent_failure hook"
      job.payload_object.on_permanent_failure
    end

    self.class.destroy_failed_jobs ? job.destroy : job.update_attributes(:failed_at => Delayed::Job.db_time_now)
  end
end

#run(job) ⇒ Object



118
119
120
121
122
123
124
125
126
127
128
129
# File 'lib/delayed/worker.rb', line 118

def run(job)
  runtime =  Benchmark.realtime do
    Timeout.timeout(self.class.max_run_time.to_i) { job.invoke_job }
    job.destroy
  end
  # TODO: warn if runtime > max_run_time ?
  say "* [JOB] #{name} completed after %.4f" % runtime
  return true  # did work
rescue Exception => e
  handle_failed_job(job, e)
  return false  # work failed
end

#say(text, level = Logger::INFO) ⇒ Object



151
152
153
154
# File 'lib/delayed/worker.rb', line 151

def say(text, level = Logger::INFO)
  puts text unless @quiet
  logger.add level, "#{Time.now.strftime('%FT%T%z')}: #{text}" if logger
end

#startObject



68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
# File 'lib/delayed/worker.rb', line 68

def start
  say "*** Starting job worker #{name}"

  trap('TERM') { say 'Exiting...'; $exit = true }
  trap('INT')  { say 'Exiting...'; $exit = true }

  loop do
    result = nil

    realtime = Benchmark.realtime do
      result = work_off
    end

    count = result.sum

    break if $exit

    if count.zero?
      sleep(@@sleep_delay)
    else
      say "#{count} jobs processed at %.4f j/s, %d failed ..." % [count / realtime, result.last]
    end

    break if $exit
  end

ensure
  Delayed::Job.clear_locks!(name)
end

#work_off(num = 100) ⇒ Object

Do num jobs and return stats on success/failure. Exit early if interrupted.



100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
# File 'lib/delayed/worker.rb', line 100

def work_off(num = 100)
  success, failure = 0, 0

  num.times do
    case reserve_and_run_one_job
    when true
        success += 1
    when false
        failure += 1
    else
      break  # leave if no work could be done
    end
    break if $exit # leave if we're exiting
  end

  return [success, failure]
end