Class: Pipemaster::Server
- Inherits:
-
Struct
- Object
- Struct
- Pipemaster::Server
- Includes:
- SocketHelper
- Defined in:
- lib/pipemaster/server.rb
Constant Summary collapse
- IO_PURGATORY =
prevents IO objects in here from being GC-ed
[]
- LISTENERS =
all bound listener sockets
[]
- WORKERS =
This hash maps PIDs to Workers
{}
- BACKGROUND =
Background workers.
[]
- START_CTX =
We populate this at startup so we can figure out how to reexecute and upgrade the currently running instance of Pipemaster This Hash is considered a stable interface and changing its contents will allow you to switch between different installations of Pipemaster or even different installations of the same applications without downtime. Keys of this constant Hash are described as follows:
-
0 - the path to the pipemaster executable
-
:argv - a deep copy of the ARGV array the executable originally saw
-
:cwd - the working directory of the application, this is where
you originally started Pipemaster.
The following example may be used in your Pipemaster config file to change your working directory during a config reload (HUP) without upgrading or restarting:
Dir.chdir(Pipemaster::Server::START_CTX[:cwd] = path)
To change your Pipemaster executable to a different path without downtime, you can set the following in your Pipemaster config file, HUP and then continue with the traditional USR2 + QUIT upgrade steps:
Pipemaster::Server::START_CTX[0] = "/home/bofh/1.9.2/bin/pipemaster"
-
{ :argv => ARGV.map { |arg| arg.dup }, :cwd => lambda { # favor ENV['PWD'] since it is (usually) symlink aware for # Capistrano and like systems begin a = File.stat(pwd = ENV['PWD']) b = File.stat(Dir.pwd) a.ino == b.ino && a.dev == b.dev ? pwd : Dir.pwd rescue Dir.pwd end }.call, 0 => $0.dup, }
- DEFAULT_COMMANDS =
{ :list => lambda { $stdout << (DEFAULT_COMMANDS.keys | commands.keys).sort.join("\n") }, :ping => lambda { $stdout << VERSION } }
Instance Attribute Summary collapse
-
#after_fork ⇒ Object
Returns the value of attribute after_fork.
-
#background ⇒ Object
Returns the value of attribute background.
-
#before_exec ⇒ Object
Returns the value of attribute before_exec.
-
#before_fork ⇒ Object
Returns the value of attribute before_fork.
-
#commands ⇒ Object
Returns the value of attribute commands.
-
#config ⇒ Object
Returns the value of attribute config.
-
#init_listeners ⇒ Object
Returns the value of attribute init_listeners.
-
#listener_opts ⇒ Object
Returns the value of attribute listener_opts.
-
#logger ⇒ Object
Returns the value of attribute logger.
-
#master_pid ⇒ Object
Returns the value of attribute master_pid.
-
#pid ⇒ Object
Returns the value of attribute pid.
-
#ready_pipe ⇒ Object
Returns the value of attribute ready_pipe.
-
#reexec_pid ⇒ Object
Returns the value of attribute reexec_pid.
-
#setup ⇒ Object
Returns the value of attribute setup.
-
#timeout ⇒ Object
Returns the value of attribute timeout.
Class Method Summary collapse
Instance Method Summary collapse
-
#initialize(options = {}) ⇒ Server
constructor
A new instance of Server.
- #join ⇒ Object
-
#kill_each_worker(signal) ⇒ Object
delivers a signal to each worker.
- #kill_worker(signal, wpid) ⇒ Object
-
#listen(address, opt = {}.merge(listener_opts[address] || {})) ⇒ Object
add a given address to the
listeners
set, idempotently Allows workers to add a private, per-process listener via the after_fork hook. -
#listener_names(listeners = LISTENERS) ⇒ Object
returns an array of string names for the given listener array.
-
#listeners=(listeners) ⇒ Object
replaces current listener set with
listeners
. - #load_config! ⇒ Object
- #process_request(socket, worker) ⇒ Object
- #reap_all_workers ⇒ Object
- #redirect_io(io, path) ⇒ Object
- #reexec ⇒ Object
- #restart_background ⇒ Object
- #run_in_background(name, worker, &block) ⇒ Object
- #start ⇒ Object
- #stderr_path=(path) ⇒ Object
- #stdout_path=(path) ⇒ Object
-
#stop(graceful = true) ⇒ Object
Terminates all workers, but does not exit master process.
-
#unlink_pid_safe(path) ⇒ Object
unlinks a PID file at given
path
if it contains the current PID still potentially racy without locking the directory (which is non-portable and may interact badly with other programs), but the window for hitting the race condition is small. -
#valid_pid?(path) ⇒ Boolean
returns a PID if a given path contains a non-stale PID file, nil otherwise.
Methods included from SocketHelper
#bind_listen, #log_buffer_sizes, #server_cast, #set_server_sockopt, #set_tcp_sockopt, #sock_name
Constructor Details
#initialize(options = {}) ⇒ Server
Returns a new instance of Server.
75 76 77 78 79 80 81 |
# File 'lib/pipemaster/server.rb', line 75 def initialize( = {}) self.reexec_pid = 0 self.ready_pipe = .delete(:ready_pipe) self.init_listeners = [:listeners] ? [:listeners].dup : [] self.config = Configurator.new(.merge(:use_defaults => true)) config.commit!(self, :skip => [:listeners, :pid]) end |
Instance Attribute Details
#after_fork ⇒ Object
Returns the value of attribute after_fork
9 10 11 |
# File 'lib/pipemaster/server.rb', line 9 def after_fork @after_fork end |
#background ⇒ Object
Returns the value of attribute background
9 10 11 |
# File 'lib/pipemaster/server.rb', line 9 def background @background end |
#before_exec ⇒ Object
Returns the value of attribute before_exec
9 10 11 |
# File 'lib/pipemaster/server.rb', line 9 def before_exec @before_exec end |
#before_fork ⇒ Object
Returns the value of attribute before_fork
9 10 11 |
# File 'lib/pipemaster/server.rb', line 9 def before_fork @before_fork end |
#commands ⇒ Object
Returns the value of attribute commands
9 10 11 |
# File 'lib/pipemaster/server.rb', line 9 def commands @commands end |
#config ⇒ Object
Returns the value of attribute config
9 10 11 |
# File 'lib/pipemaster/server.rb', line 9 def config @config end |
#init_listeners ⇒ Object
Returns the value of attribute init_listeners
9 10 11 |
# File 'lib/pipemaster/server.rb', line 9 def init_listeners @init_listeners end |
#listener_opts ⇒ Object
Returns the value of attribute listener_opts
9 10 11 |
# File 'lib/pipemaster/server.rb', line 9 def listener_opts @listener_opts end |
#logger ⇒ Object
Returns the value of attribute logger
9 10 11 |
# File 'lib/pipemaster/server.rb', line 9 def logger @logger end |
#master_pid ⇒ Object
Returns the value of attribute master_pid
9 10 11 |
# File 'lib/pipemaster/server.rb', line 9 def master_pid @master_pid end |
#pid ⇒ Object
Returns the value of attribute pid
9 10 11 |
# File 'lib/pipemaster/server.rb', line 9 def pid @pid end |
#ready_pipe ⇒ Object
Returns the value of attribute ready_pipe
9 10 11 |
# File 'lib/pipemaster/server.rb', line 9 def ready_pipe @ready_pipe end |
#reexec_pid ⇒ Object
Returns the value of attribute reexec_pid
9 10 11 |
# File 'lib/pipemaster/server.rb', line 9 def reexec_pid @reexec_pid end |
#setup ⇒ Object
Returns the value of attribute setup
9 10 11 |
# File 'lib/pipemaster/server.rb', line 9 def setup @setup end |
#timeout ⇒ Object
Returns the value of attribute timeout
9 10 11 |
# File 'lib/pipemaster/server.rb', line 9 def timeout @timeout end |
Class Method Details
Instance Method Details
#join ⇒ Object
125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 |
# File 'lib/pipemaster/server.rb', line 125 def join trap(:QUIT) { stop } [:TERM, :INT].each { |sig| trap(sig) { stop false } } self.master_pid = $$ self.pid = config[:pid] trap(:CHLD) { reap_all_workers } trap :USR1 do logger.info "master reopening logs..." Pipemaster::Util.reopen_logs logger.info "master done reopening logs" end trap :HUP do reloaded = true reap_all_workers load_config! restart_background end trap(:USR2) { reap_all_workers ; reexec } $0 = "pipemaster" logger.info "master process ready" # test_exec.rb relies on this message if ready_pipe ready_pipe.syswrite($$.to_s) ready_pipe.close rescue nil self.ready_pipe = nil end begin reloaded = false restart_background while selected = Kernel.select(LISTENERS) selected.first.each do |socket| client = socket.accept_nonblock worker = Worker.new before_fork.call(self, worker) WORKERS[fork { process_request client, worker }] = worker end end rescue Errno::EINTR retry rescue Errno::EBADF # Shutdown retry if reloaded rescue => ex logger.error "Unhandled master loop exception #{ex.inspect}." logger.error ex.backtrace.join("\n") sleep 1 # This is often failure to bind, so wait a bit retry end stop # gracefully shutdown all workers on our way out logger.info "master complete" unlink_pid_safe(pid) if pid end |
#kill_each_worker(signal) ⇒ Object
delivers a signal to each worker
192 193 194 |
# File 'lib/pipemaster/server.rb', line 192 def kill_each_worker(signal) WORKERS.keys.each { |wpid| kill_worker(signal, wpid) } end |
#kill_worker(signal, wpid) ⇒ Object
324 325 326 327 328 329 330 331 |
# File 'lib/pipemaster/server.rb', line 324 def kill_worker(signal, wpid) begin Process.kill(signal, wpid) rescue Errno::ESRCH worker = WORKERS.delete(wpid) BACKGROUND.delete(wpid) end end |
#listen(address, opt = {}.merge(listener_opts[address] || {})) ⇒ Object
add a given address to the listeners
set, idempotently Allows workers to add a private, per-process listener via the after_fork hook. Very useful for debugging and testing. :tries
may be specified as an option for the number of times to retry, and :delay
may be specified as the time in seconds to delay between retries. A negative value for :tries
indicates the listen will be retried indefinitely, this is useful when workers belonging to different masters are spawned during a transparent upgrade.
295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 |
# File 'lib/pipemaster/server.rb', line 295 def listen(address, opt = {}.merge(listener_opts[address] || {})) address = config.(address) return if String === address && listener_names.include?(address) delay = opt[:delay] || 0.5 tries = opt[:tries] || 5 begin io = bind_listen(address, opt) unless TCPServer === io || UNIXServer === io IO_PURGATORY << io io = server_cast(io) end logger.info "listening on addr=#{sock_name(io)} fd=#{io.fileno}" LISTENERS << io io rescue Errno::EADDRINUSE => err logger.error "adding listener failed addr=#{address} (in use)" raise err if tries == 0 tries -= 1 logger.error "retrying in #{delay} seconds " \ "(#{tries < 0 ? 'infinite' : tries} tries left)" sleep(delay) retry rescue => err logger.fatal "error adding listener addr=#{address}" raise err end end |
#listener_names(listeners = LISTENERS) ⇒ Object
returns an array of string names for the given listener array
227 228 229 |
# File 'lib/pipemaster/server.rb', line 227 def listener_names(listeners = LISTENERS) listeners.map { |io| sock_name(io) } end |
#listeners=(listeners) ⇒ Object
replaces current listener set with listeners
. This will close the socket if it will not exist in the new listener set
198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 |
# File 'lib/pipemaster/server.rb', line 198 def listeners=(listeners) cur_names, dead_names = [], [] listener_names.each do |name| if ?/ == name[0] # mark unlinked sockets as dead so we can rebind them (File.socket?(name) ? cur_names : dead_names) << name else cur_names << name end end set_names = listener_names(listeners) dead_names.concat(cur_names - set_names).uniq! LISTENERS.delete_if do |io| if dead_names.include?(sock_name(io)) IO_PURGATORY.delete_if do |pio| pio.fileno == io.fileno && (pio.close rescue nil).nil? # true end (io.close rescue nil).nil? # true else set_server_sockopt(io, listener_opts[sock_name(io)]) false end end (set_names - cur_names).each { |addr| listen(addr) } end |
#load_config! ⇒ Object
353 354 355 356 357 358 359 360 361 362 363 364 365 |
# File 'lib/pipemaster/server.rb', line 353 def load_config! begin logger.info "reloading pipefile=#{config.config_file}" config[:listeners].replace(init_listeners) config.reload config.commit!(self) Pipemaster::Util.reopen_logs logger.info "done reloading pipefile=#{config.config_file}" rescue => e logger.error "error reloading pipefile=#{config.config_file}: " \ "#{e.class} #{e.}" end end |
#process_request(socket, worker) ⇒ Object
421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 |
# File 'lib/pipemaster/server.rb', line 421 def process_request(socket, worker) trap(:QUIT) { exit } [:TERM, :INT].each { |sig| trap(sig) { exit! } } [:USR1, :USR2].each { |sig| trap(sig, nil) } trap(:CHLD, 'DEFAULT') WORKERS.clear LISTENERS.each { |sock| sock.fcntl(Fcntl::F_SETFD, Fcntl::FD_CLOEXEC) } after_fork.call(self, worker) $stdout.reopen socket $stdin.reopen socket begin length = socket.readpartial(4).unpack("N")[0] name, *args = socket.read(length).split("\0") $0 = "pipemaster $#{name}" logger.info "#{Process.pid} #{name} #{args.join(' ')}" ARGV.replace args if command = commands[name.to_sym] command.call *args elsif command = DEFAULT_COMMANDS[name.to_sym] instance_eval &command else raise ArgumentError, "No command #{name}" end logger.info "exit command #{name}" socket.write 0.chr rescue SystemExit => ex logger.info "exit command #{name} with #{ex.status}" socket.write ex.status.chr rescue Exception => ex logger.info "failed command #{name}: #{ex.}" socket.write "#{ex.class.name}: #{ex.}\n" socket.write 127.chr ensure socket.close_write socket.close exit! end end |
#reap_all_workers ⇒ Object
333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 |
# File 'lib/pipemaster/server.rb', line 333 def reap_all_workers begin loop do wpid, status = Process.waitpid2(-1, Process::WNOHANG) wpid or break if reexec_pid == wpid logger.error "reaped #{status.inspect} exec()-ed" self.reexec_pid = 0 self.pid = pid.chomp('.oldbin') if pid $0 = 'pipemaster' else WORKERS.delete(wpid) rescue nil BACKGROUND.delete(wpid) logger.info "reaped #{status.inspect} " end end rescue Errno::ECHILD end end |
#redirect_io(io, path) ⇒ Object
234 235 236 237 |
# File 'lib/pipemaster/server.rb', line 234 def redirect_io(io, path) File.open(path, 'ab') { |fp| io.reopen(fp) } if path io.sync = true end |
#reexec ⇒ Object
367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 |
# File 'lib/pipemaster/server.rb', line 367 def reexec if reexec_pid > 0 begin Process.kill(0, reexec_pid) logger.error "reexec-ed child already running PID:#{reexec_pid}" return rescue Errno::ESRCH self.reexec_pid = 0 end end if pid old_pid = "#{pid}.oldbin" prev_pid = pid.dup begin self.pid = old_pid # clear the path for a new pid file rescue ArgumentError logger.error "old PID:#{valid_pid?(old_pid)} running with " \ "existing pid=#{old_pid}, refusing rexec" return rescue => e logger.error "error writing pid=#{old_pid} #{e.class} #{e.}" return end end self.reexec_pid = fork do listener_fds = LISTENERS.map { |sock| sock.fileno } ENV['PIPEMASTER_FD'] = listener_fds.join(',') Dir.chdir(START_CTX[:cwd]) cmd = [ START_CTX[0] ].concat(START_CTX[:argv]) # avoid leaking FDs we don't know about, but let before_exec # unset FD_CLOEXEC, if anything else in the app eventually # relies on FD inheritence. (3..1024).each do |io| next if listener_fds.include?(io) io = IO.for_fd(io) rescue nil io or next IO_PURGATORY << io io.fcntl(Fcntl::F_SETFD, Fcntl::FD_CLOEXEC) end logger.info "executing #{cmd.inspect} (in #{Dir.pwd})" before_exec.call(self) exec(*cmd) end $0 = 'pipemaster (old)' end |
#restart_background ⇒ Object
463 464 465 466 467 468 469 470 471 472 473 474 |
# File 'lib/pipemaster/server.rb', line 463 def restart_background # Gracefully shut down all backgroud processes. BACKGROUND.delete_if { |wpid| Process.kill(:QUIT, wpid) rescue true } # Start them again. background.each do |name, block| worker = Worker.new before_fork.call self, worker pid = fork { run_in_background name, worker, &block } BACKGROUND << pid WORKERS[pid] = worker end end |
#run_in_background(name, worker, &block) ⇒ Object
476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 |
# File 'lib/pipemaster/server.rb', line 476 def run_in_background(name, worker, &block) trap(:QUIT) { exit } [:TERM, :INT].each { |sig| trap(sig) { exit! } } [:USR1, :USR2].each { |sig| trap(sig, nil) } trap(:CHLD, 'DEFAULT') WORKERS.clear LISTENERS.each { |sock| sock.fcntl(Fcntl::F_SETFD, Fcntl::FD_CLOEXEC) } after_fork.call self, worker $0 = "pipemaster/#{name}" logger.info "background worker #{name}" block.call logger.info "finished worker #{name}" rescue SystemExit => ex logger.info "finished worker #{name} with #{ex.status}" rescue =>ex logger.info "failed worker #{name}: #{ex.}" socket.write "#{ex.class.name}: #{ex.}\n" socket.write 127.chr ensure exit! end |
#start ⇒ Object
84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 |
# File 'lib/pipemaster/server.rb', line 84 def start BasicSocket.do_not_reverse_lookup = true # inherit sockets from parents, they need to be plain Socket objects # before they become UNIXServer or TCPServer inherited = ENV['PIPEMASTER_FD'].to_s.split(/,/).map do |fd| io = Socket.for_fd(fd.to_i) set_server_sockopt(io, listener_opts[sock_name(io)]) IO_PURGATORY << io logger.info "inherited addr=#{sock_name(io)} fd=#{fd}" server_cast(io) end config_listeners = config[:listeners].dup LISTENERS.replace(inherited) # we start out with generic Socket objects that get cast to either # TCPServer or UNIXServer objects; but since the Socket objects # share the same OS-level file descriptor as the higher-level *Server # objects; we need to prevent Socket objects from being garbage-collected config_listeners -= listener_names if config_listeners.empty? && LISTENERS.empty? config_listeners << Pipemaster::DEFAULT_LISTEN init_listeners << Pipemaster::DEFAULT_LISTEN START_CTX[:argv] << "-s#{Pipemaster::DEFAULT_LISTEN}" end config_listeners.each { |addr| listen(addr) } raise ArgumentError, "no listeners" if LISTENERS.empty? self.pid = config[:pid] self.master_pid = $$ if setup if defined?(Gem) && Gem.respond_to?(:refresh) logger.info "Refreshing Gem list" Gem.refresh end setup.call logger.info "setup completed" end self end |
#stderr_path=(path) ⇒ Object
232 |
# File 'lib/pipemaster/server.rb', line 232 def stderr_path=(path); redirect_io($stderr, path); end |
#stdout_path=(path) ⇒ Object
231 |
# File 'lib/pipemaster/server.rb', line 231 def stdout_path=(path); redirect_io($stdout, path); end |
#stop(graceful = true) ⇒ Object
Terminates all workers, but does not exit master process
180 181 182 183 184 185 186 187 188 189 |
# File 'lib/pipemaster/server.rb', line 180 def stop(graceful = true) self.listeners = [] limit = Time.now + timeout until WORKERS.empty? || Time.now > limit kill_each_worker(graceful ? :QUIT : :TERM) sleep(0.1) reap_all_workers end kill_each_worker(:KILL) end |
#unlink_pid_safe(path) ⇒ Object
unlinks a PID file at given path
if it contains the current PID still potentially racy without locking the directory (which is non-portable and may interact badly with other programs), but the window for hitting the race condition is small
268 269 270 |
# File 'lib/pipemaster/server.rb', line 268 def unlink_pid_safe(path) (File.read(path).to_i == $$ and File.unlink(path)) rescue nil end |
#valid_pid?(path) ⇒ Boolean
returns a PID if a given path contains a non-stale PID file, nil otherwise.
274 275 276 277 278 279 280 281 282 283 284 |
# File 'lib/pipemaster/server.rb', line 274 def valid_pid?(path) wpid = File.read(path).to_i wpid <= 0 and return nil begin Process.kill(0, wpid) wpid rescue Errno::ESRCH # don't unlink stale pid files, racy without non-portable locking... end rescue Errno::ENOENT end |