Class: Resque::Worker
- Inherits:
-
Object
- Object
- Resque::Worker
- Extended by:
- Helpers
- Includes:
- Helpers
- Defined in:
- lib/resque/worker.rb
Overview
A Resque Worker processes jobs. On platforms that support fork(2), the worker will fork off a child to process each job. This ensures a clean slate when beginning the next job and cuts down on gradual memory growth as well as low level failures.
It also ensures workers are always listening to signals from you, their master, and can react accordingly.
Instance Attribute Summary collapse
-
#cant_fork ⇒ Object
Boolean indicating whether this worker can or can not fork.
-
#job ⇒ Object
Returns the value of attribute job.
-
#to_s ⇒ Object
(also: #worker_id)
The string representation is the same as the id for this worker instance.
-
#verbose ⇒ Object
Whether the worker should log basic info to STDOUT.
-
#very_verbose ⇒ Object
Whether the worker should log lots of info to STDOUT.
Class Method Summary collapse
-
.all ⇒ Object
Returns an array of all worker objects.
-
.attach(worker_id) ⇒ Object
Alias of ‘find`.
-
.exists?(worker_id) ⇒ Boolean
# Given a string worker id, return a boolean indicating whether the # worker exists.
-
.find(worker_id) ⇒ Object
Returns a single worker object.
-
.working ⇒ Object
Returns an array of all worker objects currently processing jobs.
Instance Method Summary collapse
-
#==(other) ⇒ Object
Is this worker the same as another worker?.
- #check_payload(payload) ⇒ Object
-
#done_working ⇒ Object
Called when we are done working - clears our ‘working_on` state and tells Redis we processed a job.
-
#enable_gc_optimizations ⇒ Object
Enables GC Optimizations if you’re running REE.
-
#failed ⇒ Object
How many failed jobs has this worker seen? Returns an int.
-
#failed! ⇒ Object
Tells Redis we’ve failed a job.
-
#fork ⇒ Object
Not every platform supports fork.
-
#hostname ⇒ Object
chomp’d hostname of this machine.
-
#idle? ⇒ Boolean
Boolean - true if idle, false if not.
-
#initialize(*queues) ⇒ Worker
constructor
Workers should be initialized with an array of string queue names.
- #inspect ⇒ Object
-
#kill_child ⇒ Object
Kills the forked child immediately, without remorse.
-
#log(message) ⇒ Object
Log a message to STDOUT if we are verbose or very_verbose.
-
#log!(message) ⇒ Object
Logs a very verbose message to STDOUT.
-
#pause_processing ⇒ Object
Stop processing jobs after the current one has completed (if we’re currently running one).
-
#paused? ⇒ Boolean
are we paused?.
-
#perform(job) ⇒ Object
Processes a given job in the child.
-
#pid ⇒ Object
Returns Integer PID of running worker.
-
#process(j = nil, &block) ⇒ Object
DEPRECATED.
-
#processed ⇒ Object
How many jobs has this worker processed? Returns an int.
-
#processed! ⇒ Object
Tell Redis we’ve processed a job.
-
#processing ⇒ Object
Returns a hash explaining the Job we’re currently processing, if any.
-
#procline(string) ⇒ Object
Given a string, sets the procline ($0) and logs.
-
#prune_dead_workers ⇒ Object
Looks for any workers which should be running on this server and, if they’re not, removes them from Redis.
-
#queues ⇒ Object
Returns a list of queues to use when searching for a job.
-
#register_signal_handlers ⇒ Object
Registers the various signal handlers a worker responds to.
-
#register_worker ⇒ Object
Registers ourself as a worker.
-
#reserve ⇒ Object
Attempts to grab a job off one of the provided queues.
-
#run_hook(name, *args) ⇒ Object
Runs a named hook, passing along any arguments.
-
#shutdown ⇒ Object
Schedule this worker for shutdown.
-
#shutdown! ⇒ Object
Kill the child and shutdown immediately.
-
#shutdown? ⇒ Boolean
Should this worker shutdown as soon as current job is finished?.
-
#started ⇒ Object
What time did this worker start? Returns an instance of ‘Time`.
-
#started! ⇒ Object
Tell Redis we’ve started.
-
#startup ⇒ Object
Runs all the methods needed when a worker begins its lifecycle.
-
#state ⇒ Object
Returns a symbol representing the current worker state, which can be either :working or :idle.
-
#unpause_processing ⇒ Object
Start processing jobs again after a pause.
-
#unregister_worker ⇒ Object
Unregisters ourself as a worker.
-
#validate_queues ⇒ Object
A worker must be given a queue, otherwise it won’t know what to do with itself.
-
#work(interval = 5.0, &block) ⇒ Object
This is the main workhorse method.
-
#worker_pids ⇒ Object
Returns an array of string pids of all the other workers on this machine.
-
#working? ⇒ Boolean
Boolean - true if working, false if not.
-
#working_on(j) ⇒ Object
Given a job, tells Redis we’re working on it.
Methods included from Helpers
classify, constantize, decode, encode, mongo, mongo_queues, mongo_stats, mongo_workers
Constructor Details
#initialize(*queues) ⇒ Worker
Workers should be initialized with an array of string queue names. The order is important: a Worker will check the first queue given for a job. If none is found, it will check the second queue name given. If a job is found, it will be processed. Upon completion, the Worker will again check the first queue given, and so forth. In this way the queue list passed to a Worker on startup defines the priorities of queues.
If passed a single “*”, this Worker will operate on all queues in alphabetical order. Queues can be dynamically added or removed without needing to restart workers using this method.
81 82 83 84 |
# File 'lib/resque/worker.rb', line 81 def initialize(*queues) @queues = queues.map { |queue| queue.to_s.strip } validate_queues end |
Instance Attribute Details
#cant_fork ⇒ Object
Boolean indicating whether this worker can or can not fork. Automatically set if a fork(2) fails.
21 22 23 |
# File 'lib/resque/worker.rb', line 21 def cant_fork @cant_fork end |
#job ⇒ Object
Returns the value of attribute job.
25 26 27 |
# File 'lib/resque/worker.rb', line 25 def job @job end |
#to_s ⇒ Object Also known as: worker_id
The string representation is the same as the id for this worker instance. Can be used with ‘Worker.find`.
483 484 485 |
# File 'lib/resque/worker.rb', line 483 def to_s @to_s ||= "#{hostname}:#{Process.pid}:#{@queues.join(',')}" end |
#verbose ⇒ Object
Whether the worker should log basic info to STDOUT
14 15 16 |
# File 'lib/resque/worker.rb', line 14 def verbose @verbose end |
#very_verbose ⇒ Object
Whether the worker should log lots of info to STDOUT
17 18 19 |
# File 'lib/resque/worker.rb', line 17 def very_verbose @very_verbose end |
Class Method Details
.all ⇒ Object
Returns an array of all worker objects.
28 29 30 |
# File 'lib/resque/worker.rb', line 28 def self.all mongo_workers.distinct(:worker).map { |w| queues = w.split(','); worker = new(*queues); worker.to_s = w; worker }.compact end |
.attach(worker_id) ⇒ Object
Alias of ‘find`
60 61 62 |
# File 'lib/resque/worker.rb', line 60 def self.attach(worker_id) find(worker_id) end |
.exists?(worker_id) ⇒ Boolean
# Given a string worker id, return a boolean indicating whether the # worker exists
66 67 68 |
# File 'lib/resque/worker.rb', line 66 def self.exists?(worker_id) not mongo_workers.find_one(:worker => worker_id.to_s).nil? end |
.find(worker_id) ⇒ Object
Returns a single worker object. Accepts a string id.
49 50 51 52 53 54 55 56 57 |
# File 'lib/resque/worker.rb', line 49 def self.find(worker_id) w = mongo_workers.find_one(:worker => worker_id) return nil unless w queues = w['worker'].split(',') worker = new(*queues) worker.job = w['working_on'] || {} ## avoid a new call to mongo just to retrieve what's this worker is doing worker.to_s = worker_id worker end |
.working ⇒ Object
Returns an array of all worker objects currently processing jobs.
34 35 36 37 38 39 40 41 42 43 44 45 46 |
# File 'lib/resque/worker.rb', line 34 def self.working select = {'working_on' => { '$exists' => true }} # select['working_on'] = {"$exists" => true} working = mongo_workers.find(select).to_a # working.map! {|w| w['worker'] } working.map do |w| queues = w['worker'].split(',') worker = new(*queues) worker.to_s = w['worker'] worker.job = w['working_on'] || {} worker end end |
Instance Method Details
#==(other) ⇒ Object
Is this worker the same as another worker?
473 474 475 |
# File 'lib/resque/worker.rb', line 473 def ==(other) to_s == other.to_s end |
#check_payload(payload) ⇒ Object
377 378 379 380 381 382 383 384 385 386 387 388 389 390 |
# File 'lib/resque/worker.rb', line 377 def check_payload(payload) case payload.class.to_s when 'Class' payload.to_s when 'Array' payload.map { |e| check_payload(e) } when 'Hash' result = {} payload.each { |k,v| result[k] = check_payload(v) } result else return payload end end |
#done_working ⇒ Object
Called when we are done working - clears our ‘working_on` state and tells Redis we processed a job.
408 409 410 411 412 413 |
# File 'lib/resque/worker.rb', line 408 def done_working @job = {} working_on = {'working_on' => 1} mongo_workers.update({:worker => self.to_s}, {'$unset' => working_on}) processed! end |
#enable_gc_optimizations ⇒ Object
Enables GC Optimizations if you’re running REE. www.rubyenterpriseedition.com/faq.html#adapt_apps_for_cow
240 241 242 243 244 |
# File 'lib/resque/worker.rb', line 240 def enable_gc_optimizations if GC.respond_to?(:copy_on_write_friendly=) GC.copy_on_write_friendly = true end end |
#failed ⇒ Object
How many failed jobs has this worker seen? Returns an int.
427 428 429 |
# File 'lib/resque/worker.rb', line 427 def failed Stat["failed:#{self}"] end |
#failed! ⇒ Object
Tells Redis we’ve failed a job.
432 433 434 435 |
# File 'lib/resque/worker.rb', line 432 def failed! Stat << "failed" Stat << "failed:#{self}" end |
#fork ⇒ Object
Not every platform supports fork. Here we do our magic to determine if yours does.
207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 |
# File 'lib/resque/worker.rb', line 207 def fork @cant_fork = true if $TESTING return if @cant_fork begin # IronRuby doesn't support `Kernel.fork` yet if Kernel.respond_to?(:fork) Kernel.fork else raise NotImplementedError end rescue NotImplementedError @cant_fork = true nil end end |
#hostname ⇒ Object
chomp’d hostname of this machine
489 490 491 |
# File 'lib/resque/worker.rb', line 489 def hostname @hostname ||= `hostname`.chomp end |
#idle? ⇒ Boolean
Boolean - true if idle, false if not
461 462 463 |
# File 'lib/resque/worker.rb', line 461 def idle? state == :idle end |
#inspect ⇒ Object
477 478 479 |
# File 'lib/resque/worker.rb', line 477 def inspect "#<Worker #{to_s}>" end |
#kill_child ⇒ Object
Kills the forked child immediately, without remorse. The job it is processing will not be completed.
290 291 292 293 294 295 296 297 298 299 300 |
# File 'lib/resque/worker.rb', line 290 def kill_child if @child log! "Killing child at #{@child}" if system("ps -o pid,state -p #{@child}") Process.kill("KILL", @child) rescue nil else log! "Child #{@child} not found, restarting." shutdown end end end |
#log(message) ⇒ Object
Log a message to STDOUT if we are verbose or very_verbose.
515 516 517 518 519 520 521 522 |
# File 'lib/resque/worker.rb', line 515 def log() if verbose puts "*** #{}" elsif very_verbose time = Time.now.strftime('%H:%M:%S %Y-%m-%d') puts "** [#{time}] #$$: #{}" end end |
#log!(message) ⇒ Object
Logs a very verbose message to STDOUT.
525 526 527 |
# File 'lib/resque/worker.rb', line 525 def log!() log if very_verbose end |
#pause_processing ⇒ Object
Stop processing jobs after the current one has completed (if we’re currently running one).
309 310 311 312 |
# File 'lib/resque/worker.rb', line 309 def pause_processing log "USR2 received; pausing job processing" @paused = true end |
#paused? ⇒ Boolean
are we paused?
303 304 305 |
# File 'lib/resque/worker.rb', line 303 def paused? @paused end |
#perform(job) ⇒ Object
Processes a given job in the child.
161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 |
# File 'lib/resque/worker.rb', line 161 def perform(job) begin run_hook :after_fork, job job.perform rescue Object => e log "#{job.inspect} failed: #{e.inspect}" begin job.fail(e) rescue Object => e log "Received exception when reporting failure: #{e.inspect}" end failed! else log "done: #{job.inspect}" ensure yield job if block_given? end end |
#pid ⇒ Object
Returns Integer PID of running worker
494 495 496 |
# File 'lib/resque/worker.rb', line 494 def pid @pid ||= to_s.split(":")[1].to_i end |
#process(j = nil, &block) ⇒ Object
DEPRECATED. Processes a single job. If none is given, it will try to produce one. Usually run in the child.
152 153 154 155 156 157 158 |
# File 'lib/resque/worker.rb', line 152 def process(j = nil, &block) return unless j ||= reserve working_on j perform(j, &block) ensure done_working end |
#processed ⇒ Object
How many jobs has this worker processed? Returns an int.
416 417 418 |
# File 'lib/resque/worker.rb', line 416 def processed Stat["processed:#{self}"] end |
#processed! ⇒ Object
Tell Redis we’ve processed a job.
421 422 423 424 |
# File 'lib/resque/worker.rb', line 421 def processed! Stat << "processed" Stat << "processed:#{self}" end |
#processing ⇒ Object
Returns a hash explaining the Job we’re currently processing, if any.
451 452 453 |
# File 'lib/resque/worker.rb', line 451 def processing job || {} end |
#procline(string) ⇒ Object
Given a string, sets the procline ($0) and logs. Procline is always in the format of:
resque-VERSION: STRING
509 510 511 512 |
# File 'lib/resque/worker.rb', line 509 def procline(string) $0 = "resque-#{Resque::Version}: #{string}" log! $0 end |
#prune_dead_workers ⇒ Object
Looks for any workers which should be running on this server and, if they’re not, removes them from Redis.
This is a form of garbage collection. If a server is killed by a hard shutdown, power failure, or something else beyond our control, the Resque workers will not die gracefully and therefore will leave stale state information in Redis.
By checking the current Redis state against the actual environment, we can determine if Redis is old and clean it up a bit.
330 331 332 333 334 335 336 337 338 339 340 |
# File 'lib/resque/worker.rb', line 330 def prune_dead_workers all_workers = Worker.all known_workers = worker_pids unless all_workers.empty? all_workers.each do |worker| host, pid, queues = worker.to_s.split(':') next unless host == hostname next if known_workers.include?(pid) log! "Pruning dead worker: #{worker}" worker.unregister_worker end end |
#queues ⇒ Object
Returns a list of queues to use when searching for a job. A splat (“*”) means you want every queue (in alpha order) - this can be useful for dynamically adding new queues.
201 202 203 |
# File 'lib/resque/worker.rb', line 201 def queues @queues[0] == "*" ? Resque.queues.sort : Resque.queues(@queues).sort end |
#register_signal_handlers ⇒ Object
Registers the various signal handlers a worker responds to.
TERM: Shutdown immediately, stop processing jobs.
INT: Shutdown immediately, stop processing jobs.
QUIT: Shutdown after the current job has finished processing. USR1: Kill the forked child immediately, continue processing jobs. USR2: Don’t process any new jobs CONT: Start processing jobs again after a USR2
254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 |
# File 'lib/resque/worker.rb', line 254 def register_signal_handlers trap('TERM') { shutdown! } trap('INT') { shutdown! } begin trap('QUIT') { shutdown } trap('USR1') { kill_child } trap('USR2') { pause_processing } trap('CONT') { unpause_processing } rescue ArgumentError warn "Signals QUIT, USR1, USR2, and/or CONT not supported." end log! "Registered signals" end |
#register_worker ⇒ Object
Registers ourself as a worker. Useful when entering the worker lifecycle on startup.
344 345 346 347 |
# File 'lib/resque/worker.rb', line 344 def register_worker mongo_workers.insert(:worker => self.to_s) started! end |
#reserve ⇒ Object
Attempts to grab a job off one of the provided queues. Returns nil if no job can be found.
182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 |
# File 'lib/resque/worker.rb', line 182 def reserve queues.each do |queue| log! "Checking #{queue}" if j = Resque::Job.reserve(queue) log! "Found job on #{queue}" return j end end nil rescue Exception => e log "Error reserving job: #{e.inspect}" log e.backtrace.join("\n") raise e end |
#run_hook(name, *args) ⇒ Object
Runs a named hook, passing along any arguments.
350 351 352 353 354 355 356 357 |
# File 'lib/resque/worker.rb', line 350 def run_hook(name, *args) return unless hook = Resque.send(name) msg = "Running #{name} hook" msg << " with #{args.inspect}" if args.any? log msg args.any? ? hook.call(*args) : hook.call end |
#shutdown ⇒ Object
Schedule this worker for shutdown. Will finish processing the current job.
272 273 274 275 |
# File 'lib/resque/worker.rb', line 272 def shutdown log 'Exiting...' @shutdown = true end |
#shutdown! ⇒ Object
Kill the child and shutdown immediately.
278 279 280 281 |
# File 'lib/resque/worker.rb', line 278 def shutdown! shutdown kill_child end |
#shutdown? ⇒ Boolean
Should this worker shutdown as soon as current job is finished?
284 285 286 |
# File 'lib/resque/worker.rb', line 284 def shutdown? @shutdown end |
#started ⇒ Object
What time did this worker start? Returns an instance of ‘Time`
438 439 440 441 442 |
# File 'lib/resque/worker.rb', line 438 def started worker = mongo_workers.find_one(:worker => self.to_s) return nil if !worker worker['started'] end |
#started! ⇒ Object
Tell Redis we’ve started
445 446 447 448 |
# File 'lib/resque/worker.rb', line 445 def started! started = {'started' => Time.now } mongo_workers.update({:worker => self.to_s}, {'$set' => started}) end |
#startup ⇒ Object
Runs all the methods needed when a worker begins its lifecycle.
226 227 228 229 230 231 232 233 234 235 236 |
# File 'lib/resque/worker.rb', line 226 def startup enable_gc_optimizations register_signal_handlers prune_dead_workers run_hook :before_first_fork register_worker # Fix buffering so we can `rake resque:work > resque.log` and # get output from the child in there. $stdout.sync = true end |
#state ⇒ Object
Returns a symbol representing the current worker state, which can be either :working or :idle
467 468 469 470 |
# File 'lib/resque/worker.rb', line 467 def state worker = mongo_workers.find_one(:worker => self.to_s) worker ? :working : :idle end |
#unpause_processing ⇒ Object
Start processing jobs again after a pause
315 316 317 318 |
# File 'lib/resque/worker.rb', line 315 def unpause_processing log "CONT received; resuming job processing" @paused = false end |
#unregister_worker ⇒ Object
Unregisters ourself as a worker. Useful when shutting down.
360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 |
# File 'lib/resque/worker.rb', line 360 def unregister_worker # If we're still processing a job, make sure it gets logged as a # failure. if (hash = processing) && !hash.empty? j = Job.new(hash[:queue], hash[:payload]) # Ensure the proper worker is attached to this job, even if # it's not the precise instance that died. j.worker = self j.fail(DirtyExit.new) end mongo_workers.remove(:worker => self.to_s) Stat.clear("processed:#{self}") Stat.clear("failed:#{self}") end |
#validate_queues ⇒ Object
A worker must be given a queue, otherwise it won’t know what to do with itself.
You probably never need to call this.
90 91 92 93 94 |
# File 'lib/resque/worker.rb', line 90 def validate_queues if @queues.nil? || @queues.empty? raise NoQueueError.new("Please give each worker at least one queue.") end end |
#work(interval = 5.0, &block) ⇒ Object
This is the main workhorse method. Called on a Worker instance, it begins the worker life cycle.
The following events occur during a worker’s life cycle:
-
Startup: Signals are registered, dead workers are pruned,
and this worker is registered.
-
Work loop: Jobs are pulled from a queue and processed.
-
Teardown: This worker is unregistered.
Can be passed a float representing the polling frequency. The default is 5 seconds, but for a semi-active site you may want to use a smaller value.
Also accepts a block which will be passed the job as soon as it has completed processing. Useful for testing.
112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 |
# File 'lib/resque/worker.rb', line 112 def work(interval = 5.0, &block) interval = Float(interval) $0 = "resque: Starting" job_count = 0 startup loop do break if shutdown? if not paused? and job = reserve log "got: #{job.inspect}" run_hook :before_fork, job working_on job if @child = fork srand # Reseeding procline "Forked #{@child} at #{Time.now.to_i}" Process.wait else procline "Processing #{job.queue} since #{Time.now.to_s} (#{job_count} so far)" perform(job, &block) job_count += 1 exit! unless @cant_fork end done_working @child = nil else break if interval.zero? log! "Sleeping for #{interval} seconds" procline paused? ? "Paused" : "Waiting for #{@queues.join(',')}" sleep interval end end ensure unregister_worker end |
#worker_pids ⇒ Object
Returns an array of string pids of all the other workers on this machine. Useful when pruning dead workers on startup.
500 501 502 503 504 |
# File 'lib/resque/worker.rb', line 500 def worker_pids `ps -A -o pid,command | grep [r]esque | grep -v "resque-web"`.split("\n").map do |line| line.split(' ')[0] end end |
#working? ⇒ Boolean
Boolean - true if working, false if not
456 457 458 |
# File 'lib/resque/worker.rb', line 456 def working? state == :working end |
#working_on(j) ⇒ Object
Given a job, tells Redis we’re working on it. Useful for seeing what workers are doing and when.
394 395 396 397 398 399 400 401 402 403 404 |
# File 'lib/resque/worker.rb', line 394 def working_on(j) j.worker = self data = { 'queue' => j.queue, 'run_at' => Time.now.to_s, 'payload' => check_payload(j.payload) } @job = data working_on = {'working_on' => data} mongo_workers.update({:worker => self.to_s}, {'$set' => working_on}, :upsert => true ) end |