Class: Resque::Worker
- Inherits:
-
Object
- Object
- Resque::Worker
- Defined in:
- lib/resque_ui/overrides/resque/worker.rb
Class Method Summary collapse
- .start(ips, queues) ⇒ Object
-
.working ⇒ Object
logic for mappged_mget changed where it returns keys with nil values in latest redis gem.
Instance Method Summary collapse
- #all_workers_in_pid_working ⇒ Object
- #continue ⇒ Object
- #ip ⇒ Object
- #local_ip ⇒ Object
- #pause ⇒ Object
-
#pause_key ⇒ Object
When the worker gets the -USR2 signal, to_s may give a different value for the thread and queue portion.
-
#pause_processing ⇒ Object
Stop processing jobs after the current one has completed (if we’re currently running one).
- #paused ⇒ Object
-
#paused? ⇒ Boolean
are we paused? OVERRIDE so UI can tell if we’re paused.
-
#prune_dead_workers ⇒ Object
Looks for any workers which should be running on this server and, if they’re not, removes them from Redis.
- #queue ⇒ Object
-
#queues ⇒ Object
OVERRIDE for multithreaded workers.
- #queues_in_pid ⇒ Object
- #quit ⇒ Object
- #restart ⇒ Object
-
#shutdown ⇒ Object
Schedule this worker for shutdown.
-
#startup ⇒ Object
Runs all the methods needed when a worker begins its lifecycle.
- #status ⇒ Object
- #status=(status) ⇒ Object
- #thread ⇒ Object
-
#to_s ⇒ Object
(also: #id)
The string representation is the same as the id for this worker instance.
-
#unpause_processing ⇒ Object
Start processing jobs again after a pause OVERRIDE to set remove redis key so UI knows it’s unpaused too Would prefer to call super but get no superclass method error.
-
#unregister_worker ⇒ Object
Unregisters ourself as a worker.
-
#work(interval = 5, &block) ⇒ Object
This is the main workhorse method.
- #workers_in_pid ⇒ Object
Class Method Details
.start(ips, queues) ⇒ Object
234 235 236 237 238 239 240 |
# File 'lib/resque_ui/overrides/resque/worker.rb', line 234 def self.start(ips, queues) if Rails.env =~ /development|test/ Thread.new(queues) { |queue| system("rake RAILS_ENV=#{Rails.env} QUEUE=#{queue} resque:work") } else Thread.new(queues, ips) { |queue, ip_list| system("cd #{Rails.root}; #{ResqueUi::Cap.path} #{Rails.env} resque:work host=#{ip_list} queue=#{queue}") } end end |
.working ⇒ Object
logic for mappged_mget changed where it returns keys with nil values in latest redis gem.
216 217 218 219 220 221 222 223 |
# File 'lib/resque_ui/overrides/resque/worker.rb', line 216 def self.working names = all return [] unless names.any? names.map! { |name| "worker:#{name}" } redis.mapped_mget(*names).map do |key, value| find key.sub("worker:", '') unless value.nil? end.compact end |
Instance Method Details
#all_workers_in_pid_working ⇒ Object
153 154 155 |
# File 'lib/resque_ui/overrides/resque/worker.rb', line 153 def all_workers_in_pid_working workers_in_pid.select { |w| (hash = w.processing) && !hash.empty? } end |
#continue ⇒ Object
265 266 267 268 269 270 271 |
# File 'lib/resque_ui/overrides/resque/worker.rb', line 265 def continue if Rails.env =~ /development|test/ system("kill -CONT #{self.pid}") else system("cd #{Rails.root}; #{ResqueUi::Cap.path} #{Rails.env} resque:continue_worker pid=#{self.pid} host=#{self.ip}") end end |
#ip ⇒ Object
43 44 45 |
# File 'lib/resque_ui/overrides/resque/worker.rb', line 43 def ip to_s.split(':').first[/\b(?:\d{1,3}\.){3}\d{1,3}\b/] end |
#local_ip ⇒ Object
6 7 8 9 10 11 12 13 14 15 |
# File 'lib/resque_ui/overrides/resque/worker.rb', line 6 def local_ip orig, Socket.do_not_reverse_lookup = Socket.do_not_reverse_lookup, true # turn off reverse DNS resolution temporarily UDPSocket.open do |s| s.connect '64.233.187.99', 1 s.addr.last end ensure Socket.do_not_reverse_lookup = orig end |
#pause ⇒ Object
257 258 259 260 261 262 263 |
# File 'lib/resque_ui/overrides/resque/worker.rb', line 257 def pause if Rails.env =~ /development|test/ system("kill -USR2 #{self.pid}") else system("cd #{Rails.root}; #{ResqueUi::Cap.path} #{Rails.env} resque:pause_worker pid=#{self.pid} host=#{self.ip}") end end |
#pause_key ⇒ Object
When the worker gets the -USR2 signal, to_s may give a different value for the thread and queue portion
26 27 28 29 |
# File 'lib/resque_ui/overrides/resque/worker.rb', line 26 def pause_key key = to_s.split(':') "worker:#{key.first}:#{key.second}:all_workers:paused" end |
#pause_processing ⇒ Object
Stop processing jobs after the current one has completed (if we’re currently running one). OVERRIDE to set a redis key so UI knows it’s paused too Would prefer to call super but get no superclass method error
95 96 97 98 99 |
# File 'lib/resque_ui/overrides/resque/worker.rb', line 95 def pause_processing log "USR2 received; pausing job processing" @paused = true redis.set(pause_key, Time.now.to_s) end |
#paused ⇒ Object
81 82 83 |
# File 'lib/resque_ui/overrides/resque/worker.rb', line 81 def paused redis.get pause_key end |
#paused? ⇒ Boolean
are we paused? OVERRIDE so UI can tell if we’re paused
87 88 89 |
# File 'lib/resque_ui/overrides/resque/worker.rb', line 87 def paused? @paused || paused.present? end |
#prune_dead_workers ⇒ Object
Looks for any workers which should be running on this server and, if they’re not, removes them from Redis.
This is a form of garbage collection. If a server is killed by a hard shutdown, power failure, or something else beyond our control, the Resque workers will not die gracefully and therefor will leave stale state information in Redis.
By checking the current Redis state against the actual environment, we can determine if Redis is old and clean it up a bit.
120 121 122 123 124 125 126 127 128 |
# File 'lib/resque_ui/overrides/resque/worker.rb', line 120 def prune_dead_workers Worker.all.each do |worker| host, pid, thread, queues = worker.id.split(':') next unless host.include?(hostname) next if worker_pids.include?(pid) log! "Pruning dead worker: #{worker}" worker.unregister_worker end end |
#queue ⇒ Object
35 36 37 |
# File 'lib/resque_ui/overrides/resque/worker.rb', line 35 def queue to_s.split(':').last end |
#queues ⇒ Object
OVERRIDE for multithreaded workers
52 53 54 |
# File 'lib/resque_ui/overrides/resque/worker.rb', line 52 def queues Thread.current[:queues] == "*" ? Resque.queues.sort : Thread.current[:queues].split(',') end |
#queues_in_pid ⇒ Object
47 48 49 |
# File 'lib/resque_ui/overrides/resque/worker.rb', line 47 def queues_in_pid workers_in_pid.collect { |w| w.queue } end |
#quit ⇒ Object
242 243 244 245 246 247 248 249 250 251 252 253 254 255 |
# File 'lib/resque_ui/overrides/resque/worker.rb', line 242 def quit if Rails.env =~ /development|test/ if RUBY_PLATFORM =~ /java/ #jruby doesn't trap the -QUIT signal #-TERM gracefully kills the main pid and does a -9 on the child if there is one. #Since jruby doesn't fork a child, the main worker is gracefully killed. system("kill -TERM #{self.pid}") else system("kill -QUIT #{self.pid}") end else system("cd #{Rails.root}; #{ResqueUi::Cap.path} #{Rails.env} resque:quit_worker pid=#{self.pid} host=#{self.ip}") end end |
#restart ⇒ Object
273 274 275 276 277 |
# File 'lib/resque_ui/overrides/resque/worker.rb', line 273 def restart queues = self.queues_in_pid.join('#') quit self.class.start(self.ip, queues) end |
#shutdown ⇒ Object
Schedule this worker for shutdown. Will finish processing the current job. OVERRIDE for multithreaded workers
75 76 77 78 79 |
# File 'lib/resque_ui/overrides/resque/worker.rb', line 75 def shutdown log 'Exiting...' Thread.list.each { |t| t[:shutdown] = true } @shutdown = true end |
#startup ⇒ Object
Runs all the methods needed when a worker begins its lifecycle. OVERRIDE for multithreaded workers
58 59 60 61 62 63 64 65 66 67 68 69 70 |
# File 'lib/resque_ui/overrides/resque/worker.rb', line 58 def startup enable_gc_optimizations if Thread.current == Thread.main register_signal_handlers prune_dead_workers end run_hook :before_first_fork register_worker # Fix buffering so we can `rake resque:work > resque.log` and # get output from the child in there. $stdout.sync = true end |
#status ⇒ Object
230 231 232 |
# File 'lib/resque_ui/overrides/resque/worker.rb', line 230 def status job['status'] end |
#status=(status) ⇒ Object
225 226 227 228 |
# File 'lib/resque_ui/overrides/resque/worker.rb', line 225 def status=(status) data = encode(job.merge('status' => status)) redis.set("worker:#{self}", data) end |
#thread ⇒ Object
31 32 33 |
# File 'lib/resque_ui/overrides/resque/worker.rb', line 31 def thread to_s.split(':').third end |
#to_s ⇒ Object Also known as: id
The string representation is the same as the id for this worker instance. Can be used with ‘Worker.find`.
19 20 21 |
# File 'lib/resque_ui/overrides/resque/worker.rb', line 19 def to_s @to_s || "#{hostname}(#{local_ip}):#{Process.pid}:#{Thread.current.object_id}:#{Thread.current[:queues]}" end |
#unpause_processing ⇒ Object
Start processing jobs again after a pause OVERRIDE to set remove redis key so UI knows it’s unpaused too Would prefer to call super but get no superclass method error
104 105 106 107 108 |
# File 'lib/resque_ui/overrides/resque/worker.rb', line 104 def unpause_processing log "CONT received; resuming job processing" @paused = false redis.del(pause_key) end |
#unregister_worker ⇒ Object
Unregisters ourself as a worker. Useful when shutting down. OVERRIDE to also remove the pause key Would prefer to call super but get no superclass method error
133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 |
# File 'lib/resque_ui/overrides/resque/worker.rb', line 133 def unregister_worker # If we're still processing a job, make sure it gets logged as a # failure. if (hash = processing) && !hash.empty? job = Job.new(hash['queue'], hash['payload']) # Ensure the proper worker is attached to this job, even if # it's not the precise instance that died. job.worker = self job.fail(DirtyExit.new) end redis.srem(:workers, self) redis.del("worker:#{self}") redis.del("worker:#{self}:started") redis.del(pause_key) Stat.clear("processed:#{self}") Stat.clear("failed:#{self}") end |
#work(interval = 5, &block) ⇒ Object
This is the main workhorse method. Called on a Worker instance, it begins the worker life cycle.
The following events occur during a worker’s life cycle:
-
Startup: Signals are registered, dead workers are pruned,
and this worker is registered.
-
Work loop: Jobs are pulled from a queue and processed.
-
Teardown: This worker is unregistered.
Can be passed an integer representing the polling frequency. The default is 5 seconds, but for a semi-active site you may want to use a smaller value.
Also accepts a block which will be passed the job as soon as it has completed processing. Useful for testing. OVERRIDE for multithreaded workers
174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 |
# File 'lib/resque_ui/overrides/resque/worker.rb', line 174 def work(interval = 5, &block) $0 = "resque: Starting" startup loop do break if @shutdown || Thread.current[:shutdown] if not @paused and job = reserve log "got: #{job.inspect}" run_hook :before_fork working_on job if @child = fork rand # Reseeding procline "Forked #{@child} at #{Time.now.to_i}" Process.wait else procline "Processing #{job.queue} since #{Time.now.to_i}" perform(job, &block) exit! unless @cant_fork end done_working @child = nil else break if interval.to_i == 0 log! "Sleeping for #{interval.to_i}" procline @paused ? "Paused" : "Waiting for #{@queues.join(',')}" sleep interval.to_i end end unregister_worker rescue nil loop do #hang onto the process until all threads are done break if all_workers_in_pid_working.blank? sleep interval.to_i end ensure unregister_worker end |