Class: Resque::Worker

Inherits:
Object
  • Object
show all
Extended by:
Helpers
Includes:
Helpers
Defined in:
lib/resque/worker.rb

Overview

A Resque Worker processes jobs. On platforms that support fork(2), the worker will fork off a child to process each job. This ensures a clean slate when beginning the next job and cuts down on gradual memory growth as well as low level failures.

It also ensures workers are always listening to signals from you, their master, and can react accordingly.

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Methods included from Helpers

classify, constantize, decode, encode, mongo, mongo_queues, mongo_stats, mongo_workers

Constructor Details

#initialize(*queues) ⇒ Worker

Workers should be initialized with an array of string queue names. The order is important: a Worker will check the first queue given for a job. If none is found, it will check the second queue name given. If a job is found, it will be processed. Upon completion, the Worker will again check the first queue given, and so forth. In this way the queue list passed to a Worker on startup defines the priorities of queues.

If passed a single “*”, this Worker will operate on all queues in alphabetical order. Queues can be dynamically added or removed without needing to restart workers using this method.



81
82
83
84
# File 'lib/resque/worker.rb', line 81

def initialize(*queues)
  @queues = queues.map { |queue| queue.to_s.strip }
  validate_queues
end

Instance Attribute Details

#cant_forkObject

Boolean indicating whether this worker can or can not fork. Automatically set if a fork(2) fails.



21
22
23
# File 'lib/resque/worker.rb', line 21

def cant_fork
  @cant_fork
end

#jobObject

Returns the value of attribute job.



25
26
27
# File 'lib/resque/worker.rb', line 25

def job
  @job
end

#to_sObject Also known as: worker_id

The string representation is the same as the id for this worker instance. Can be used with ‘Worker.find`.



483
484
485
# File 'lib/resque/worker.rb', line 483

def to_s
  @to_s ||= "#{hostname}:#{Process.pid}:#{@queues.join(',')}"
end

#verboseObject

Whether the worker should log basic info to STDOUT



14
15
16
# File 'lib/resque/worker.rb', line 14

def verbose
  @verbose
end

#very_verboseObject

Whether the worker should log lots of info to STDOUT



17
18
19
# File 'lib/resque/worker.rb', line 17

def very_verbose
  @very_verbose
end

Class Method Details

.allObject

Returns an array of all worker objects.



28
29
30
# File 'lib/resque/worker.rb', line 28

def self.all
  mongo_workers.distinct(:worker).map { |w| queues = w.split(','); worker = new(*queues); worker.to_s = w; worker }.compact
end

.attach(worker_id) ⇒ Object

Alias of ‘find`



60
61
62
# File 'lib/resque/worker.rb', line 60

def self.attach(worker_id)
  find(worker_id)
end

.exists?(worker_id) ⇒ Boolean

# Given a string worker id, return a boolean indicating whether the # worker exists

Returns:

  • (Boolean)


66
67
68
# File 'lib/resque/worker.rb', line 66

def self.exists?(worker_id)
  not mongo_workers.find_one(:worker => worker_id.to_s).nil?
end

.find(worker_id) ⇒ Object

Returns a single worker object. Accepts a string id.



49
50
51
52
53
54
55
56
57
# File 'lib/resque/worker.rb', line 49

def self.find(worker_id)
  w = mongo_workers.find_one(:worker => worker_id)
  return nil unless w
  queues = w['worker'].split(',')
  worker = new(*queues)
  worker.job = w['working_on'] || {} ## avoid a new call to mongo just to retrieve what's this worker is doing
  worker.to_s = worker_id
  worker
end

.workingObject

Returns an array of all worker objects currently processing jobs.



34
35
36
37
38
39
40
41
42
43
44
45
46
# File 'lib/resque/worker.rb', line 34

def self.working
  select = {'working_on' => { '$exists' => true }}
  # select['working_on'] = {"$exists" => true}
  working = mongo_workers.find(select).to_a
  # working.map! {|w| w['worker'] }
  working.map do |w|
    queues = w['worker'].split(',')
    worker = new(*queues)
    worker.to_s = w['worker']
    worker.job = w['working_on'] || {}
    worker 
  end
end

Instance Method Details

#==(other) ⇒ Object

Is this worker the same as another worker?



473
474
475
# File 'lib/resque/worker.rb', line 473

def ==(other)
  to_s == other.to_s
end

#check_payload(payload) ⇒ Object



377
378
379
380
381
382
383
384
385
386
387
388
389
390
# File 'lib/resque/worker.rb', line 377

def check_payload(payload)
  case payload.class.to_s
  when 'Class'
    payload.to_s
  when 'Array'
    payload.map { |e| check_payload(e) }
  when 'Hash'
    result = {}
    payload.each { |k,v| result[k] = check_payload(v) }
    result
  else
    return payload
  end
end

#done_workingObject

Called when we are done working - clears our ‘working_on` state and tells Redis we processed a job.



408
409
410
411
412
413
# File 'lib/resque/worker.rb', line 408

def done_working
  @job = {}
  working_on = {'working_on' => 1}
  mongo_workers.update({:worker =>  self.to_s}, {'$unset' => working_on})      
  processed!
end

#enable_gc_optimizationsObject

Enables GC Optimizations if you’re running REE. www.rubyenterpriseedition.com/faq.html#adapt_apps_for_cow



240
241
242
243
244
# File 'lib/resque/worker.rb', line 240

def enable_gc_optimizations
  if GC.respond_to?(:copy_on_write_friendly=)
    GC.copy_on_write_friendly = true
  end
end

#failedObject

How many failed jobs has this worker seen? Returns an int.



427
428
429
# File 'lib/resque/worker.rb', line 427

def failed
  Stat["failed:#{self}"]
end

#failed!Object

Tells Redis we’ve failed a job.



432
433
434
435
# File 'lib/resque/worker.rb', line 432

def failed!
  Stat << "failed"
  Stat << "failed:#{self}"
end

#forkObject

Not every platform supports fork. Here we do our magic to determine if yours does.



207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
# File 'lib/resque/worker.rb', line 207

def fork
  @cant_fork = true if $TESTING

  return if @cant_fork

  begin
    # IronRuby doesn't support `Kernel.fork` yet
    if Kernel.respond_to?(:fork)
      Kernel.fork
    else
      raise NotImplementedError
    end
  rescue NotImplementedError
    @cant_fork = true
    nil
  end
end

#hostnameObject

chomp’d hostname of this machine



489
490
491
# File 'lib/resque/worker.rb', line 489

def hostname
  @hostname ||= `hostname`.chomp
end

#idle?Boolean

Boolean - true if idle, false if not

Returns:

  • (Boolean)


461
462
463
# File 'lib/resque/worker.rb', line 461

def idle?
  state == :idle
end

#inspectObject



477
478
479
# File 'lib/resque/worker.rb', line 477

def inspect
  "#<Worker #{to_s}>"
end

#kill_childObject

Kills the forked child immediately, without remorse. The job it is processing will not be completed.



290
291
292
293
294
295
296
297
298
299
300
# File 'lib/resque/worker.rb', line 290

def kill_child
  if @child
    log! "Killing child at #{@child}"
    if system("ps -o pid,state -p #{@child}")
      Process.kill("KILL", @child) rescue nil
    else
      log! "Child #{@child} not found, restarting."
      shutdown
    end
  end
end

#log(message) ⇒ Object

Log a message to STDOUT if we are verbose or very_verbose.



515
516
517
518
519
520
521
522
# File 'lib/resque/worker.rb', line 515

def log(message)
  if verbose
    puts "*** #{message}"
  elsif very_verbose
    time = Time.now.strftime('%H:%M:%S %Y-%m-%d')
    puts "** [#{time}] #$$: #{message}"
  end
end

#log!(message) ⇒ Object

Logs a very verbose message to STDOUT.



525
526
527
# File 'lib/resque/worker.rb', line 525

def log!(message)
  log message if very_verbose
end

#pause_processingObject

Stop processing jobs after the current one has completed (if we’re currently running one).



309
310
311
312
# File 'lib/resque/worker.rb', line 309

def pause_processing
  log "USR2 received; pausing job processing"
  @paused = true
end

#paused?Boolean

are we paused?

Returns:

  • (Boolean)


303
304
305
# File 'lib/resque/worker.rb', line 303

def paused?
  @paused
end

#perform(job) ⇒ Object

Processes a given job in the child.



161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
# File 'lib/resque/worker.rb', line 161

def perform(job)
  begin
    run_hook :after_fork, job
    job.perform
  rescue Object => e
    log "#{job.inspect} failed: #{e.inspect}"
    begin
      job.fail(e)
    rescue Object => e
      log "Received exception when reporting failure: #{e.inspect}"
    end
    failed!
  else
    log "done: #{job.inspect}"
  ensure
    yield job if block_given?
  end
end

#pidObject

Returns Integer PID of running worker



494
495
496
# File 'lib/resque/worker.rb', line 494

def pid
  @pid ||= to_s.split(":")[1].to_i
end

#process(j = nil, &block) ⇒ Object

DEPRECATED. Processes a single job. If none is given, it will try to produce one. Usually run in the child.



152
153
154
155
156
157
158
# File 'lib/resque/worker.rb', line 152

def process(j = nil, &block)
  return unless j ||= reserve
  working_on j
  perform(j, &block)
ensure
  done_working
end

#processedObject

How many jobs has this worker processed? Returns an int.



416
417
418
# File 'lib/resque/worker.rb', line 416

def processed
  Stat["processed:#{self}"]
end

#processed!Object

Tell Redis we’ve processed a job.



421
422
423
424
# File 'lib/resque/worker.rb', line 421

def processed!
  Stat << "processed"
  Stat << "processed:#{self}"
end

#processingObject

Returns a hash explaining the Job we’re currently processing, if any.



451
452
453
# File 'lib/resque/worker.rb', line 451

def processing
  job || {} 
end

#procline(string) ⇒ Object

Given a string, sets the procline ($0) and logs. Procline is always in the format of:

resque-VERSION: STRING


509
510
511
512
# File 'lib/resque/worker.rb', line 509

def procline(string)
  $0 = "resque-#{Resque::Version}: #{string}"
  log! $0
end

#prune_dead_workersObject

Looks for any workers which should be running on this server and, if they’re not, removes them from Redis.

This is a form of garbage collection. If a server is killed by a hard shutdown, power failure, or something else beyond our control, the Resque workers will not die gracefully and therefore will leave stale state information in Redis.

By checking the current Redis state against the actual environment, we can determine if Redis is old and clean it up a bit.



330
331
332
333
334
335
336
337
338
339
340
# File 'lib/resque/worker.rb', line 330

def prune_dead_workers
  all_workers = Worker.all
  known_workers = worker_pids unless all_workers.empty?
  all_workers.each do |worker|
    host, pid, queues = worker.to_s.split(':')
    next unless host == hostname
    next if known_workers.include?(pid)
    log! "Pruning dead worker: #{worker}"
    worker.unregister_worker
  end
end

#queuesObject

Returns a list of queues to use when searching for a job. A splat (“*”) means you want every queue (in alpha order) - this can be useful for dynamically adding new queues.



201
202
203
# File 'lib/resque/worker.rb', line 201

def queues
  @queues[0] == "*" ? Resque.queues.sort : Resque.queues(@queues).sort
end

#register_signal_handlersObject

Registers the various signal handlers a worker responds to.

TERM: Shutdown immediately, stop processing jobs.

INT: Shutdown immediately, stop processing jobs.

QUIT: Shutdown after the current job has finished processing. USR1: Kill the forked child immediately, continue processing jobs. USR2: Don’t process any new jobs CONT: Start processing jobs again after a USR2



254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
# File 'lib/resque/worker.rb', line 254

def register_signal_handlers
  trap('TERM') { shutdown!  }
  trap('INT')  { shutdown!  }

  begin
    trap('QUIT') { shutdown   }
    trap('USR1') { kill_child }
    trap('USR2') { pause_processing }
    trap('CONT') { unpause_processing }
  rescue ArgumentError
    warn "Signals QUIT, USR1, USR2, and/or CONT not supported."
  end

  log! "Registered signals"
end

#register_workerObject

Registers ourself as a worker. Useful when entering the worker lifecycle on startup.



344
345
346
347
# File 'lib/resque/worker.rb', line 344

def register_worker
  mongo_workers.insert(:worker => self.to_s)
  started!
end

#reserveObject

Attempts to grab a job off one of the provided queues. Returns nil if no job can be found.



182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
# File 'lib/resque/worker.rb', line 182

def reserve
  queues.each do |queue|
    log! "Checking #{queue}"
    if j = Resque::Job.reserve(queue)
      log! "Found job on #{queue}"
      return j
    end
  end

  nil
rescue Exception => e
  log "Error reserving job: #{e.inspect}"
  log e.backtrace.join("\n")
  raise e
end

#run_hook(name, *args) ⇒ Object

Runs a named hook, passing along any arguments.



350
351
352
353
354
355
356
357
# File 'lib/resque/worker.rb', line 350

def run_hook(name, *args)
  return unless hook = Resque.send(name)
  msg = "Running #{name} hook"
  msg << " with #{args.inspect}" if args.any?
  log msg

  args.any? ? hook.call(*args) : hook.call
end

#shutdownObject

Schedule this worker for shutdown. Will finish processing the current job.



272
273
274
275
# File 'lib/resque/worker.rb', line 272

def shutdown
  log 'Exiting...'
  @shutdown = true
end

#shutdown!Object

Kill the child and shutdown immediately.



278
279
280
281
# File 'lib/resque/worker.rb', line 278

def shutdown!
  shutdown
  kill_child
end

#shutdown?Boolean

Should this worker shutdown as soon as current job is finished?

Returns:

  • (Boolean)


284
285
286
# File 'lib/resque/worker.rb', line 284

def shutdown?
  @shutdown
end

#startedObject

What time did this worker start? Returns an instance of ‘Time`



438
439
440
441
442
# File 'lib/resque/worker.rb', line 438

def started
  worker = mongo_workers.find_one(:worker => self.to_s)
  return nil if !worker
  worker['started']
end

#started!Object

Tell Redis we’ve started



445
446
447
448
# File 'lib/resque/worker.rb', line 445

def started!
  started = {'started' => Time.now }
  mongo_workers.update({:worker => self.to_s},  {'$set' => started})
end

#startupObject

Runs all the methods needed when a worker begins its lifecycle.



226
227
228
229
230
231
232
233
234
235
236
# File 'lib/resque/worker.rb', line 226

def startup
  enable_gc_optimizations
  register_signal_handlers
  prune_dead_workers
  run_hook :before_first_fork
  register_worker

  # Fix buffering so we can `rake resque:work > resque.log` and
  # get output from the child in there.
  $stdout.sync = true
end

#stateObject

Returns a symbol representing the current worker state, which can be either :working or :idle



467
468
469
470
# File 'lib/resque/worker.rb', line 467

def state
  worker = mongo_workers.find_one(:worker => self.to_s)
  worker ? :working : :idle
end

#unpause_processingObject

Start processing jobs again after a pause



315
316
317
318
# File 'lib/resque/worker.rb', line 315

def unpause_processing
  log "CONT received; resuming job processing"
  @paused = false
end

#unregister_workerObject

Unregisters ourself as a worker. Useful when shutting down.



360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
# File 'lib/resque/worker.rb', line 360

def unregister_worker
  # If we're still processing a job, make sure it gets logged as a
  # failure.
  if (hash = processing) && !hash.empty?
    j = Job.new(hash[:queue], hash[:payload])
    # Ensure the proper worker is attached to this job, even if
    # it's not the precise instance that died.
    j.worker = self
    j.fail(DirtyExit.new)
  end

  mongo_workers.remove(:worker => self.to_s)

  Stat.clear("processed:#{self}")
  Stat.clear("failed:#{self}")
end

#validate_queuesObject

A worker must be given a queue, otherwise it won’t know what to do with itself.

You probably never need to call this.



90
91
92
93
94
# File 'lib/resque/worker.rb', line 90

def validate_queues
  if @queues.nil? || @queues.empty?
    raise NoQueueError.new("Please give each worker at least one queue.")
  end
end

#work(interval = 5.0, &block) ⇒ Object

This is the main workhorse method. Called on a Worker instance, it begins the worker life cycle.

The following events occur during a worker’s life cycle:

  1. Startup: Signals are registered, dead workers are pruned,

    and this worker is registered.
    
  2. Work loop: Jobs are pulled from a queue and processed.

  3. Teardown: This worker is unregistered.

Can be passed a float representing the polling frequency. The default is 5 seconds, but for a semi-active site you may want to use a smaller value.

Also accepts a block which will be passed the job as soon as it has completed processing. Useful for testing.



112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
# File 'lib/resque/worker.rb', line 112

def work(interval = 5.0, &block)
  interval = Float(interval)
  $0 = "resque: Starting"
  job_count = 0
  startup
  loop do
    break if shutdown?

    if not paused? and job = reserve
      log "got: #{job.inspect}"
      run_hook :before_fork, job
      working_on job

      if @child = fork
        srand # Reseeding
        procline "Forked #{@child} at #{Time.now.to_i}"
        Process.wait
      else
        procline "Processing #{job.queue} since #{Time.now.to_s} (#{job_count} so far)"
        perform(job, &block)
        job_count += 1
        exit! unless @cant_fork
      end

      done_working
      @child = nil
    else
      break if interval.zero?
      log! "Sleeping for #{interval} seconds"
      procline paused? ? "Paused" : "Waiting for #{@queues.join(',')}"
      sleep interval
    end
  end

ensure
  unregister_worker
end

#worker_pidsObject

Returns an array of string pids of all the other workers on this machine. Useful when pruning dead workers on startup.



500
501
502
503
504
# File 'lib/resque/worker.rb', line 500

def worker_pids
  `ps -A -o pid,command | grep [r]esque | grep -v "resque-web"`.split("\n").map do |line|
    line.split(' ')[0]
  end
end

#working?Boolean

Boolean - true if working, false if not

Returns:

  • (Boolean)


456
457
458
# File 'lib/resque/worker.rb', line 456

def working?
  state == :working
end

#working_on(j) ⇒ Object

Given a job, tells Redis we’re working on it. Useful for seeing what workers are doing and when.



394
395
396
397
398
399
400
401
402
403
404
# File 'lib/resque/worker.rb', line 394

def working_on(j)
  j.worker = self
  data = {
    'queue'   => j.queue,
    'run_at'  => Time.now.to_s,
    'payload' => check_payload(j.payload)
  }
  @job = data
  working_on = {'working_on' => data}
  mongo_workers.update({:worker => self.to_s},  {'$set' => working_on}, :upsert => true )
end