Class: Resque::Worker
- Inherits:
-
Object
- Object
- Resque::Worker
- Extended by:
- Helpers
- Includes:
- Helpers
- Defined in:
- lib/resque/worker.rb
Overview
A Resque Worker processes jobs. On platforms that support fork(2), the worker will fork off a child to process each job. This ensures a clean slate when beginning the next job and cuts down on gradual memory growth as well as low level failures.
It also ensures workers are always listening to signals from you, their master, and can react accordingly.
Instance Attribute Summary collapse
-
#cant_fork ⇒ Object
Boolean indicating whether this worker can or can not fork.
-
#to_s ⇒ Object
(also: #id)
The string representation is the same as the id for this worker instance.
-
#verbose ⇒ Object
Whether the worker should log basic info to STDOUT.
-
#very_verbose ⇒ Object
Whether the worker should log lots of info to STDOUT.
Class Method Summary collapse
-
.all ⇒ Object
Returns an array of all worker objects.
-
.attach(worker_id) ⇒ Object
Alias of ‘find`.
-
.exists?(worker_id) ⇒ Boolean
Given a string worker id, return a boolean indicating whether the worker exists.
-
.find(worker_id) ⇒ Object
Returns a single worker object.
-
.working ⇒ Object
Returns an array of all worker objects currently processing jobs.
Instance Method Summary collapse
-
#==(other) ⇒ Object
Is this worker the same as another worker?.
-
#done_working ⇒ Object
Called when we are done working - clears our ‘working_on` state and tells Redis we processed a job.
-
#enable_gc_optimizations ⇒ Object
Enables GC Optimizations if you’re running REE.
-
#failed ⇒ Object
How many failed jobs has this worker seen? Returns an int.
-
#failed! ⇒ Object
Tells Redis we’ve failed a job.
-
#fork ⇒ Object
Not every platform supports fork.
-
#hostname ⇒ Object
chomp’d hostname of this machine.
-
#idle? ⇒ Boolean
Boolean - true if idle, false if not.
-
#initialize(*queues) ⇒ Worker
constructor
Workers should be initialized with an array of string queue names.
- #inspect ⇒ Object
-
#job ⇒ Object
(also: #processing)
Returns a hash explaining the Job we’re currently processing, if any.
-
#kill_child ⇒ Object
Kills the forked child immediately, without remorse.
-
#log(message) ⇒ Object
Log a message to STDOUT if we are verbose or very_verbose.
-
#log!(message) ⇒ Object
Logs a very verbose message to STDOUT.
-
#process(job = nil) ⇒ Object
Processes a single job.
-
#processed ⇒ Object
How many jobs has this worker processed? Returns an int.
-
#processed! ⇒ Object
Tell Redis we’ve processed a job.
-
#prune_dead_workers ⇒ Object
Looks for any workers which should be running on this server and, if they’re not, removes them from Redis.
-
#queues ⇒ Object
Returns a list of queues to use when searching for a job.
-
#register_signal_handlers ⇒ Object
Registers the various signal handlers a worker responds to.
-
#register_worker ⇒ Object
Registers ourself as a worker.
-
#reserve ⇒ Object
Attempts to grab a job off one of the provided queues.
-
#shutdown ⇒ Object
Schedule this worker for shutdown.
-
#shutdown! ⇒ Object
Kill the child and shutdown immediately.
-
#started ⇒ Object
What time did this worker start? Returns an instance of ‘Time`.
-
#started! ⇒ Object
Tell Redis we’ve started.
-
#startup ⇒ Object
Runs all the methods needed when a worker begins its lifecycle.
-
#state ⇒ Object
Returns a symbol representing the current worker state, which can be either :working or :idle.
-
#unregister_worker ⇒ Object
Unregisters ourself as a worker.
-
#validate_queues ⇒ Object
A worker must be given a queue, otherwise it won’t know what to do with itself.
-
#work(interval = 5, &block) ⇒ Object
This is the main workhorse method.
-
#worker_pids ⇒ Object
Returns an array of string pids of all the other workers on this machine.
-
#working? ⇒ Boolean
Boolean - true if working, false if not.
-
#working_on(job) ⇒ Object
Given a job, tells Redis we’re working on it.
Methods included from Helpers
classify, constantize, decode, encode, redis
Constructor Details
#initialize(*queues) ⇒ Worker
Workers should be initialized with an array of string queue names. The order is important: a Worker will check the first queue given for a job. If none is found, it will check the second queue name given. If a job is found, it will be processed. Upon completion, the Worker will again check the first queue given, and so forth. In this way the queue list passed to a Worker on startup defines the priorities of queues.
If passed a single “*”, this Worker will operate on all queues in alphabetical order. Queues can be dynamically added or removed without needing to restart workers using this method.
75 76 77 78 |
# File 'lib/resque/worker.rb', line 75 def initialize(*queues) @queues = queues validate_queues end |
Instance Attribute Details
#cant_fork ⇒ Object
Boolean indicating whether this worker can or can not fork. Automatically set if a fork(2) fails.
21 22 23 |
# File 'lib/resque/worker.rb', line 21 def cant_fork @cant_fork end |
#to_s ⇒ Object Also known as: id
The string representation is the same as the id for this worker instance. Can be used with ‘Worker.find`.
378 379 380 |
# File 'lib/resque/worker.rb', line 378 def to_s @to_s ||= "#{hostname}:#{Process.pid}:#{@queues.join(',')}" end |
#verbose ⇒ Object
Whether the worker should log basic info to STDOUT
14 15 16 |
# File 'lib/resque/worker.rb', line 14 def verbose @verbose end |
#very_verbose ⇒ Object
Whether the worker should log lots of info to STDOUT
17 18 19 |
# File 'lib/resque/worker.rb', line 17 def very_verbose @very_verbose end |
Class Method Details
.all ⇒ Object
Returns an array of all worker objects.
26 27 28 |
# File 'lib/resque/worker.rb', line 26 def self.all redis.smembers(:workers).map { |id| find(id) } end |
.attach(worker_id) ⇒ Object
Alias of ‘find`
54 55 56 |
# File 'lib/resque/worker.rb', line 54 def self.attach(worker_id) find(worker_id) end |
.exists?(worker_id) ⇒ Boolean
Given a string worker id, return a boolean indicating whether the worker exists
60 61 62 |
# File 'lib/resque/worker.rb', line 60 def self.exists?(worker_id) redis.sismember(:workers, worker_id) end |
.find(worker_id) ⇒ Object
Returns a single worker object. Accepts a string id.
42 43 44 45 46 47 48 49 50 51 |
# File 'lib/resque/worker.rb', line 42 def self.find(worker_id) if exists? worker_id queues = worker_id.split(':')[-1].split(',') worker = new(*queues) worker.to_s = worker_id worker else nil end end |
.working ⇒ Object
Returns an array of all worker objects currently processing jobs.
32 33 34 35 36 37 38 39 |
# File 'lib/resque/worker.rb', line 32 def self.working names = all return [] unless names.any? names.map! { |name| "worker:#{name}" } redis.mapped_mget(*names).keys.map do |key| find key.sub("worker:", '') end end |
Instance Method Details
#==(other) ⇒ Object
Is this worker the same as another worker?
368 369 370 |
# File 'lib/resque/worker.rb', line 368 def ==(other) to_s == other.to_s end |
#done_working ⇒ Object
Called when we are done working - clears our ‘working_on` state and tells Redis we processed a job.
308 309 310 311 |
# File 'lib/resque/worker.rb', line 308 def done_working processed! redis.del("worker:#{self}") end |
#enable_gc_optimizations ⇒ Object
Enables GC Optimizations if you’re running REE. www.rubyenterpriseedition.com/faq.html#adapt_apps_for_cow
208 209 210 211 212 |
# File 'lib/resque/worker.rb', line 208 def enable_gc_optimizations if GC.respond_to?(:copy_on_write_friendly=) GC.copy_on_write_friendly = true end end |
#failed ⇒ Object
How many failed jobs has this worker seen? Returns an int.
325 326 327 |
# File 'lib/resque/worker.rb', line 325 def failed Stat["failed:#{self}"] end |
#failed! ⇒ Object
Tells Redis we’ve failed a job.
330 331 332 333 |
# File 'lib/resque/worker.rb', line 330 def failed! Stat << "failed" Stat << "failed:#{self}" end |
#fork ⇒ Object
Not every platform supports fork. Here we do our magic to determine if yours does.
185 186 187 188 189 190 191 192 193 194 195 196 |
# File 'lib/resque/worker.rb', line 185 def fork @cant_fork = true if $TESTING return if @cant_fork begin Kernel.fork rescue NotImplementedError @cant_fork = true nil end end |
#hostname ⇒ Object
chomp’d hostname of this machine
384 385 386 |
# File 'lib/resque/worker.rb', line 384 def hostname @hostname ||= `hostname`.chomp end |
#idle? ⇒ Boolean
Boolean - true if idle, false if not
357 358 359 |
# File 'lib/resque/worker.rb', line 357 def idle? state == :idle end |
#inspect ⇒ Object
372 373 374 |
# File 'lib/resque/worker.rb', line 372 def inspect "#<Worker #{to_s}>" end |
#job ⇒ Object Also known as: processing
Returns a hash explaining the Job we’re currently processing, if any.
346 347 348 |
# File 'lib/resque/worker.rb', line 346 def job decode(redis.get("worker:#{self}")) || {} end |
#kill_child ⇒ Object
Kills the forked child immediately, without remorse. The job it is processing will not be completed.
245 246 247 248 249 250 251 252 253 254 255 |
# File 'lib/resque/worker.rb', line 245 def kill_child if @child log! "Killing child at #{@child}" if system("ps -ho pid,state -p #{@child}") Process.kill("KILL", @child) rescue nil else log! "Child #{@child} not found, restarting." shutdown end end end |
#log(message) ⇒ Object
Log a message to STDOUT if we are verbose or very_verbose.
397 398 399 400 401 402 403 404 |
# File 'lib/resque/worker.rb', line 397 def log() if verbose puts "*** #{}" elsif very_verbose time = Time.now.strftime('%I:%M:%S %Y-%m-%d') puts "** [#{time}] #$$: #{}" end end |
#log!(message) ⇒ Object
Logs a very verbose message to STDOUT.
407 408 409 |
# File 'lib/resque/worker.rb', line 407 def log!() log if very_verbose end |
#process(job = nil) ⇒ Object
Processes a single job. If none is given, it will try to produce one.
144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 |
# File 'lib/resque/worker.rb', line 144 def process(job = nil) return unless job ||= reserve begin working_on job job.perform rescue Object => e log "#{job.inspect} failed: #{e.inspect}" job.fail(e) failed! else log "done: #{job.inspect}" ensure yield job if block_given? done_working end end |
#processed ⇒ Object
How many jobs has this worker processed? Returns an int.
314 315 316 |
# File 'lib/resque/worker.rb', line 314 def processed Stat["processed:#{self}"] end |
#processed! ⇒ Object
Tell Redis we’ve processed a job.
319 320 321 322 |
# File 'lib/resque/worker.rb', line 319 def processed! Stat << "processed" Stat << "processed:#{self}" end |
#prune_dead_workers ⇒ Object
Looks for any workers which should be running on this server and, if they’re not, removes them from Redis.
This is a form of garbage collection. If a server is killed by a hard shutdown, power failure, or something else beyond our control, the Resque workers will not die gracefully and therefor will leave stale state information in Redis.
By checking the current Redis state against the actual environment, we can determine if Redis is old and clean it up a bit.
267 268 269 270 271 272 273 274 275 |
# File 'lib/resque/worker.rb', line 267 def prune_dead_workers Worker.all.each do |worker| host, pid, queues = worker.id.split(':') next unless host == hostname next if worker_pids.include?(pid) log! "Pruning dead worker: #{worker}" worker.unregister_worker end end |
#queues ⇒ Object
Returns a list of queues to use when searching for a job. A splat (“*”) means you want every queue (in alpha order) - this can be useful for dynamically adding new queues.
179 180 181 |
# File 'lib/resque/worker.rb', line 179 def queues @queues[0] == "*" ? Resque.queues.sort : @queues end |
#register_signal_handlers ⇒ Object
Registers the various signal handlers a worker responds to.
TERM: Shutdown immediately, stop processing jobs.
INT: Shutdown immediately, stop processing jobs.
QUIT: Shutdown after the current job has finished processing. USR1: Kill the forked child immediately, continue processing jobs.
220 221 222 223 224 225 226 227 228 |
# File 'lib/resque/worker.rb', line 220 def register_signal_handlers trap('TERM') { shutdown! } trap('INT') { shutdown! } unless defined? JRUBY_VERSION trap('QUIT') { shutdown } trap('USR1') { kill_child } end log! "Registered signals" end |
#register_worker ⇒ Object
Registers ourself as a worker. Useful when entering the worker lifecycle on startup.
279 280 281 282 |
# File 'lib/resque/worker.rb', line 279 def register_worker redis.sadd(:workers, self) started! end |
#reserve ⇒ Object
Attempts to grab a job off one of the provided queues. Returns nil if no job can be found.
164 165 166 167 168 169 170 171 172 173 174 |
# File 'lib/resque/worker.rb', line 164 def reserve queues.each do |queue| log! "Checking #{queue}" if job = Resque::Job.reserve(queue) log! "Found job on #{queue}" return job end end nil end |
#shutdown ⇒ Object
Schedule this worker for shutdown. Will finish processing the current job.
232 233 234 235 |
# File 'lib/resque/worker.rb', line 232 def shutdown log 'Exiting...' @shutdown = true end |
#shutdown! ⇒ Object
Kill the child and shutdown immediately.
238 239 240 241 |
# File 'lib/resque/worker.rb', line 238 def shutdown! shutdown kill_child end |
#started ⇒ Object
What time did this worker start? Returns an instance of ‘Time`
336 337 338 |
# File 'lib/resque/worker.rb', line 336 def started redis.get "worker:#{self}:started" end |
#started! ⇒ Object
Tell Redis we’ve started
341 342 343 |
# File 'lib/resque/worker.rb', line 341 def started! redis.set("worker:#{self}:started", Time.now.to_s) end |
#startup ⇒ Object
Runs all the methods needed when a worker begins its lifecycle.
199 200 201 202 203 204 |
# File 'lib/resque/worker.rb', line 199 def startup enable_gc_optimizations register_signal_handlers prune_dead_workers register_worker end |
#state ⇒ Object
Returns a symbol representing the current worker state, which can be either :working or :idle
363 364 365 |
# File 'lib/resque/worker.rb', line 363 def state redis.exists("worker:#{self}") ? :working : :idle end |
#unregister_worker ⇒ Object
Unregisters ourself as a worker. Useful when shutting down.
285 286 287 288 289 290 291 292 293 |
# File 'lib/resque/worker.rb', line 285 def unregister_worker done_working redis.srem(:workers, self) redis.del("worker:#{self}:started") Stat.clear("processed:#{self}") Stat.clear("failed:#{self}") end |
#validate_queues ⇒ Object
A worker must be given a queue, otherwise it won’t know what to do with itself.
You probably never need to call this.
84 85 86 87 88 |
# File 'lib/resque/worker.rb', line 84 def validate_queues if @queues.nil? || @queues.empty? raise NoQueueError.new("Please give each worker at least one queue.") end end |
#work(interval = 5, &block) ⇒ Object
This is the main workhorse method. Called on a Worker instance, it begins the worker life cycle.
The following events occur during a worker’s life cycle:
-
startup: Signals are registered, dead workers are pruned,
and this worker is registered.
-
work loop: Jobs are pulled from a queue and processed
-
teardown: This worker is unregistered.
Can be passed an integered representing the polling frequency. The default is 5 seconds, but for a semi-active site you may want to use a smaller value.
Also accepts a block which will be passed the job as soon as it has completed processing. Useful for testing.
106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 |
# File 'lib/resque/worker.rb', line 106 def work(interval = 5, &block) $0 = "resque: Starting" startup loop do break if @shutdown if job = reserve log "got: #{job.inspect}" if @child = fork procline = "resque: Forked #{@child} at #{Time.now.to_i}" $0 = procline log! procline Process.wait else procline = "resque: Processing #{job.queue} since #{Time.now.to_i}" $0 = procline log! procline process(job, &block) exit! unless @cant_fork end @child = nil else break if interval.to_i == 0 log! "Sleeping for #{interval.to_i}" $0 = "resque: Waiting for #{@queues.join(',')}" sleep interval.to_i end end ensure unregister_worker end |
#worker_pids ⇒ Object
Returns an array of string pids of all the other workers on this machine. Useful when pruning dead workers on startup.
390 391 392 393 394 |
# File 'lib/resque/worker.rb', line 390 def worker_pids `ps -A -o pid,command | grep [r]esque`.split("\n").map do |line| line.split(' ')[0] end end |
#working? ⇒ Boolean
Boolean - true if working, false if not
352 353 354 |
# File 'lib/resque/worker.rb', line 352 def working? state == :working end |
#working_on(job) ⇒ Object
Given a job, tells Redis we’re working on it. Useful for seeing what workers are doing and when.
297 298 299 300 301 302 303 304 |
# File 'lib/resque/worker.rb', line 297 def working_on(job) job.worker = self data = encode \ :queue => job.queue, :run_at => Time.now.to_s, :payload => job.payload redis.set("worker:#{self}", data) end |