Class: Resqued::Master
- Inherits:
-
Object
- Object
- Resqued::Master
- Includes:
- Logging, Pidfile, ProclineVersion, Sleepy
- Defined in:
- lib/resqued/master.rb
Overview
The master process.
-
Spawns a listener.
-
Tracks all work. (IO pipe from listener.)
-
Handles signals.
Constant Summary collapse
- SIGNALS =
[ :HUP, :INT, :USR2, :CONT, :TERM, :QUIT ]
- OPTIONAL_SIGNALS =
[ :INFO ]
- OTHER_SIGNALS =
[:CHLD, 'EXIT']
- TRAPS =
SIGNALS + OPTIONAL_SIGNALS + OTHER_SIGNALS
- SIGNAL_QUEUE =
[]
Instance Method Summary collapse
-
#all_listeners ⇒ Object
Private: All the ListenerProxy objects.
-
#dump_object_counts ⇒ Object
Private.
-
#go_ham ⇒ Object
Private: dat main loop.
-
#initialize(options) ⇒ Master
constructor
A new instance of Master.
- #install_signal_handlers ⇒ Object
- #kill_all_listeners(signal) ⇒ Object
- #kill_listener(signal, listener) ⇒ Object
-
#listener_pids ⇒ Object
Private: Map listener pids to ListenerProxy objects.
-
#listener_running(listener) ⇒ Object
Listener message: A listener finished booting, and is ready to start workers.
- #listener_status(listener, status) ⇒ Object
- #next_listener_id ⇒ Object
- #no_more_unexpected_exits ⇒ Object
-
#prepare_new_listener ⇒ Object
Private: Spin up a new listener.
- #read_listeners ⇒ Object
- #reap_all_listeners(waitpid_flags = 0) ⇒ Object
- #report_unexpected_exits ⇒ Object
-
#run(ready_pipe = nil) ⇒ Object
Public: Starts the master process.
- #start_listener ⇒ Object
- #status_message(type, pid, status) ⇒ Object
- #wait_for_workers ⇒ Object
-
#worker_finished(pid) ⇒ Object
Listener message: A worker just stopped working.
-
#worker_started(pid) ⇒ Object
Listener message: A worker just started working.
- #worker_status(pid, status) ⇒ Object
- #write_procline ⇒ Object
- #yawn(duration) ⇒ Object
Methods included from Sleepy
Methods included from ProclineVersion
Methods included from Pidfile
#remove_pidfile, #with_pidfile, #write_pidfile
Methods included from Logging
build_logger, close_log, #log, log_file, log_file=, #log_to_stdout?, logger, logging_io, #reopen_logs
Constructor Details
#initialize(options) ⇒ Master
Returns a new instance of Master.
19 20 21 22 23 24 25 26 |
# File 'lib/resqued/master.rb', line 19 def initialize() @config_paths = .fetch(:config_paths) @pidfile = .fetch(:master_pidfile) { nil } @status_pipe = .fetch(:status_pipe) { nil } @fast_exit = .fetch(:fast_exit) { false } @listener_backoff = Backoff.new @listeners_created = 0 end |
Instance Method Details
#all_listeners ⇒ Object
Private: All the ListenerProxy objects.
109 110 111 |
# File 'lib/resqued/master.rb', line 109 def all_listeners listener_pids.values end |
#dump_object_counts ⇒ Object
Private.
77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 |
# File 'lib/resqued/master.rb', line 77 def dump_object_counts log GC.stat.inspect counts = {} total = 0 ObjectSpace.each_object do |o| count = counts[o.class.name] || 0 counts[o.class.name] = count + 1 total += 1 end top = 10 log "#{total} objects. top #{top}:" counts.sort_by { |name, count| count }.reverse.each_with_index do |(name, count), i| if i < top diff = "" if last = @last_counts && @last_counts[name] diff = " (#{'%+d' % (count - last)})" end log " #{count} #{name}#{diff}" end end @last_counts = counts log GC.stat.inspect rescue => e log "Error while counting objects: #{e}" end |
#go_ham ⇒ Object
Private: dat main loop.
44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 |
# File 'lib/resqued/master.rb', line 44 def go_ham loop do read_listeners reap_all_listeners(Process::WNOHANG) start_listener unless @paused case signal = SIGNAL_QUEUE.shift when nil yawn(@listener_backoff.how_long? || 30.0) when :INFO dump_object_counts when :HUP reopen_logs log "Restarting listener with new configuration and application." prepare_new_listener when :USR2 log "Pause job processing" @paused = true kill_listener(:QUIT, @current_listener) @current_listener = nil when :CONT log "Resume job processing" @paused = false kill_all_listeners(:CONT) when :INT, :TERM, :QUIT log "Shutting down..." kill_all_listeners(signal) wait_for_workers unless @fast_exit break end end end |
#install_signal_handlers ⇒ Object
225 226 227 228 229 |
# File 'lib/resqued/master.rb', line 225 def install_signal_handlers trap(:CHLD) { awake } SIGNALS.each { |signal| trap(signal) { SIGNAL_QUEUE << signal ; awake } } OPTIONAL_SIGNALS.each { |signal| trap(signal) { SIGNAL_QUEUE << signal ; awake } rescue nil } end |
#kill_all_listeners(signal) ⇒ Object
183 184 185 186 187 |
# File 'lib/resqued/master.rb', line 183 def kill_all_listeners(signal) all_listeners.each do |l| l.kill(signal) end end |
#kill_listener(signal, listener) ⇒ Object
179 180 181 |
# File 'lib/resqued/master.rb', line 179 def kill_listener(signal, listener) listener.kill(signal) if listener end |
#listener_pids ⇒ Object
Private: Map listener pids to ListenerProxy objects.
104 105 106 |
# File 'lib/resqued/master.rb', line 104 def listener_pids @listener_pids ||= {} end |
#listener_running(listener) ⇒ Object
Listener message: A listener finished booting, and is ready to start workers.
Promotes a booting listener to be the current listener.
151 152 153 154 155 156 157 158 159 160 161 162 |
# File 'lib/resqued/master.rb', line 151 def listener_running(listener) listener_status(listener, 'ready') if listener == @current_listener kill_listener(:QUIT, @last_good_listener) @last_good_listener = nil else # This listener didn't receive the last SIGQUIT we sent. # (It was probably sent before the listener had set up its traps.) # So kill it again. We have moved on. kill_listener(:QUIT, listener) end end |
#listener_status(listener, status) ⇒ Object
254 255 256 257 258 |
# File 'lib/resqued/master.rb', line 254 def listener_status(listener, status) if listener && listener.pid ('listener', listener.pid, status) end end |
#next_listener_id ⇒ Object
123 124 125 |
# File 'lib/resqued/master.rb', line 123 def next_listener_id @listeners_created += 1 end |
#no_more_unexpected_exits ⇒ Object
242 243 244 |
# File 'lib/resqued/master.rb', line 242 def no_more_unexpected_exits trap('EXIT', 'DEFAULT') end |
#prepare_new_listener ⇒ Object
Private: Spin up a new listener.
The old one will be killed when the new one is ready for workers.
167 168 169 170 171 172 173 174 175 176 177 |
# File 'lib/resqued/master.rb', line 167 def prepare_new_listener if @last_good_listener # The last_good_listener is still running because we got another HUP before the new listener finished booting. # Keep the last_good_listener (where all the workers are) and kill the booting current_listener. We'll start a new one. kill_listener(:QUIT, @current_listener) else @last_good_listener = @current_listener end # Indicate to `start_listener` that it should start a new listener. @current_listener = nil end |
#read_listeners ⇒ Object
127 128 129 130 131 |
# File 'lib/resqued/master.rb', line 127 def read_listeners all_listeners.each do |l| l.read_worker_status(:on_activity => self) end end |
#reap_all_listeners(waitpid_flags = 0) ⇒ Object
193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 |
# File 'lib/resqued/master.rb', line 193 def reap_all_listeners(waitpid_flags = 0) begin lpid, status = Process.waitpid2(-1, waitpid_flags) if lpid log "Listener exited #{status}" if @current_listener && @current_listener.pid == lpid @listener_backoff.died @current_listener = nil end if @last_good_listener && @last_good_listener.pid == lpid @last_good_listener = nil end dead_listener = listener_pids.delete(lpid) listener_status dead_listener, 'stop' dead_listener.dispose write_procline else return end rescue Errno::ECHILD return end while true end |
#report_unexpected_exits ⇒ Object
231 232 233 234 235 236 237 238 239 240 |
# File 'lib/resqued/master.rb', line 231 def report_unexpected_exits trap('EXIT') do log("EXIT #{$!.inspect}") if $! $!.backtrace.each do |line| log(line) end end end end |
#run(ready_pipe = nil) ⇒ Object
Public: Starts the master process.
29 30 31 32 33 34 35 36 37 38 39 40 41 |
# File 'lib/resqued/master.rb', line 29 def run(ready_pipe = nil) report_unexpected_exits with_pidfile(@pidfile) do write_procline install_signal_handlers if ready_pipe ready_pipe.syswrite($$.to_s) ready_pipe.close rescue nil end go_ham end no_more_unexpected_exits end |
#start_listener ⇒ Object
113 114 115 116 117 118 119 120 121 |
# File 'lib/resqued/master.rb', line 113 def start_listener return if @current_listener || @listener_backoff.wait? @current_listener = ListenerProxy.new(:config_paths => @config_paths, :old_workers => all_listeners.map { |l| l.running_workers }.flatten, :listener_id => next_listener_id) @current_listener.run listener_status @current_listener, 'start' @listener_backoff.started listener_pids[@current_listener.pid] = @current_listener write_procline end |
#status_message(type, pid, status) ⇒ Object
264 265 266 267 268 |
# File 'lib/resqued/master.rb', line 264 def (type, pid, status) if @status_pipe @status_pipe.write("#{type},#{pid},#{status}\n") end end |
#wait_for_workers ⇒ Object
189 190 191 |
# File 'lib/resqued/master.rb', line 189 def wait_for_workers reap_all_listeners end |
#worker_finished(pid) ⇒ Object
Listener message: A worker just stopped working.
Forwards the message to the other listeners.
141 142 143 144 145 146 |
# File 'lib/resqued/master.rb', line 141 def worker_finished(pid) worker_status(pid, 'stop') all_listeners.each do |other| other.worker_finished(pid) end end |
#worker_started(pid) ⇒ Object
Listener message: A worker just started working.
134 135 136 |
# File 'lib/resqued/master.rb', line 134 def worker_started(pid) worker_status(pid, 'start') end |
#worker_status(pid, status) ⇒ Object
260 261 262 |
# File 'lib/resqued/master.rb', line 260 def worker_status(pid, status) ('worker', pid, status) end |
#write_procline ⇒ Object
250 251 252 |
# File 'lib/resqued/master.rb', line 250 def write_procline $0 = "#{procline_version} master [gen #{@listeners_created}] [#{listener_pids.size} running] #{ARGV.join(' ')}" end |
#yawn(duration) ⇒ Object
246 247 248 |
# File 'lib/resqued/master.rb', line 246 def yawn(duration) super(duration, all_listeners.map { |l| l.read_pipe }) end |