Class: Pitchfork::HttpServer

Inherits:
Object
  • Object
show all
Includes:
HttpResponse, SocketHelper
Defined in:
lib/pitchfork/http_server.rb

Overview

This is the process manager of Pitchfork. This manages worker processes which in turn handle the I/O and application process. Listener sockets are started in the master process and shared with forked worker children.

Defined Under Namespace

Classes: TimeoutHandler

Constant Summary collapse

LISTENERS =

all bound listener sockets note: this is public used by raindrops, but not recommended for use in new projects

Listeners.new
NOOP =
'.'

Constants included from HttpResponse

Pitchfork::HttpResponse::ILLEGAL_HEADER_VALUE, Pitchfork::HttpResponse::STATUS_CODES

Constants included from SocketHelper

SocketHelper::DEFAULTS

Instance Attribute Summary collapse

Instance Method Summary collapse

Methods included from HttpResponse

#append_header, #err_response, #http_response_write, #httpdate

Methods included from SocketHelper

sock_name, tcp_name

Constructor Details

#initialize(app, options = {}) ⇒ HttpServer

Creates a working server on host:port (strange things happen if port isn’t a Number). Use HttpServer::run to start the server and HttpServer.run.join to join the thread that’s processing incoming requests on the socket.



102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
# File 'lib/pitchfork/http_server.rb', line 102

def initialize(app, options = {})
  @exit_status = 0
  @app = app
  @respawn = false
  @last_check = Pitchfork.time_now
  @promotion_lock = Flock.new("pitchfork-promotion")
  Info.keep_io(@promotion_lock)

  options = options.dup
  @ready_pipe = options.delete(:ready_pipe)
  @init_listeners = options[:listeners].dup || []
  options[:use_defaults] = true
  self.config = Pitchfork::Configurator.new(options)
  self.listener_opts = {}

  proc_name role: 'monitor', status: ARGV.join(' ')

  # We use @control_socket differently in the master and worker processes:
  #
  # * The master process never closes or reinitializes this once
  # initialized.  Signal handlers in the master process will write to
  # it to wake up the master from IO.select in exactly the same manner
  # djb describes in https://cr.yp.to/docs/selfpipe.html
  #
  # * The workers immediately close the pipe they inherit.  See the
  # Pitchfork::Worker class for the pipe workers use.
  @control_socket = []
  @children = Children.new
  @sig_queue = [] # signal queue used for self-piping
  @pid = nil

  # we try inheriting listeners first, so we bind them later.
  # we don't write the pid file until we've bound listeners in case
  # pitchfork was started twice by mistake.  Even though our #pid= method
  # checks for stale/existing pid files, race conditions are still
  # possible (and difficult/non-portable to avoid) and can be likely
  # to clobber the pid if the second start was in quick succession
  # after the first, so we rely on the listener binding to fail in
  # that case.  Some tests (in and outside of this source tree) and
  # monitoring tools may also rely on pid files existing before we
  # attempt to connect to the listener(s)
  config.commit!(self, :skip => [:listeners, :pid])
  @orig_app = app
  # list of signals we care about and trap in master.
  @queue_sigs = [
    :QUIT, :INT, :TERM, :USR2, :TTIN, :TTOU ]

  Info.workers_count = worker_processes
  SharedMemory.preallocate_pages(worker_processes)
end

Instance Attribute Details

#after_mold_forkObject

:stopdoc:



80
81
82
# File 'lib/pitchfork/http_server.rb', line 80

def after_mold_fork
  @after_mold_fork
end

#after_monitor_ready=(value) ⇒ Object (writeonly)

Sets the attribute after_monitor_ready

Parameters:

  • value

    the value to set the attribute after_monitor_ready to.



84
85
86
# File 'lib/pitchfork/http_server.rb', line 84

def after_monitor_ready=(value)
  @after_monitor_ready = value
end

#after_request_complete=(value) ⇒ Object (writeonly)

Sets the attribute after_request_complete

Parameters:

  • value

    the value to set the attribute after_request_complete to.



84
85
86
# File 'lib/pitchfork/http_server.rb', line 84

def after_request_complete=(value)
  @after_request_complete = value
end

#after_worker_exit=(value) ⇒ Object (writeonly)

Sets the attribute after_worker_exit

Parameters:

  • value

    the value to set the attribute after_worker_exit to.



84
85
86
# File 'lib/pitchfork/http_server.rb', line 84

def after_worker_exit=(value)
  @after_worker_exit = value
end

#after_worker_forkObject

:stopdoc:



80
81
82
# File 'lib/pitchfork/http_server.rb', line 80

def after_worker_fork
  @after_worker_fork
end

#after_worker_hard_timeout=(value) ⇒ Object (writeonly)

Sets the attribute after_worker_hard_timeout

Parameters:

  • value

    the value to set the attribute after_worker_hard_timeout to.



84
85
86
# File 'lib/pitchfork/http_server.rb', line 84

def after_worker_hard_timeout=(value)
  @after_worker_hard_timeout = value
end

#after_worker_ready=(value) ⇒ Object (writeonly)

Sets the attribute after_worker_ready

Parameters:

  • value

    the value to set the attribute after_worker_ready to.



84
85
86
# File 'lib/pitchfork/http_server.rb', line 84

def after_worker_ready=(value)
  @after_worker_ready = value
end

#after_worker_timeout=(value) ⇒ Object (writeonly)

Sets the attribute after_worker_timeout

Parameters:

  • value

    the value to set the attribute after_worker_timeout to.



84
85
86
# File 'lib/pitchfork/http_server.rb', line 84

def after_worker_timeout=(value)
  @after_worker_timeout = value
end

#appObject

:stopdoc:



80
81
82
# File 'lib/pitchfork/http_server.rb', line 80

def app
  @app
end

#before_forkObject

:stopdoc:



80
81
82
# File 'lib/pitchfork/http_server.rb', line 80

def before_fork
  @before_fork
end

#before_service_worker_exitObject

:stopdoc:



80
81
82
# File 'lib/pitchfork/http_server.rb', line 80

def before_service_worker_exit
  @before_service_worker_exit
end

#before_service_worker_readyObject

:stopdoc:



80
81
82
# File 'lib/pitchfork/http_server.rb', line 80

def before_service_worker_ready
  @before_service_worker_ready
end

#before_worker_exit=(value) ⇒ Object (writeonly)

Sets the attribute before_worker_exit

Parameters:

  • value

    the value to set the attribute before_worker_exit to.



84
85
86
# File 'lib/pitchfork/http_server.rb', line 84

def before_worker_exit=(value)
  @before_worker_exit = value
end

#childrenObject

:stopdoc:



80
81
82
# File 'lib/pitchfork/http_server.rb', line 80

def children
  @children
end

#cleanup_timeoutObject

:stopdoc:



80
81
82
# File 'lib/pitchfork/http_server.rb', line 80

def cleanup_timeout
  @cleanup_timeout
end

#configObject

:stopdoc:



80
81
82
# File 'lib/pitchfork/http_server.rb', line 80

def config
  @config
end

#early_hintsObject

:stopdoc:



80
81
82
# File 'lib/pitchfork/http_server.rb', line 80

def early_hints
  @early_hints
end

#listener_optsObject

:stopdoc:



80
81
82
# File 'lib/pitchfork/http_server.rb', line 80

def listener_opts
  @listener_opts
end

#loggerObject

Returns the value of attribute logger.



87
88
89
# File 'lib/pitchfork/http_server.rb', line 87

def logger
  @logger
end

#orig_appObject

:stopdoc:



80
81
82
# File 'lib/pitchfork/http_server.rb', line 80

def orig_app
  @orig_app
end

#ready_pipeObject

:stopdoc:



80
81
82
# File 'lib/pitchfork/http_server.rb', line 80

def ready_pipe
  @ready_pipe
end

#refork_condition=(value) ⇒ Object (writeonly)

Sets the attribute refork_condition

Parameters:

  • value

    the value to set the attribute refork_condition to.



84
85
86
# File 'lib/pitchfork/http_server.rb', line 84

def refork_condition=(value)
  @refork_condition = value
end

#soft_timeoutObject

:stopdoc:



80
81
82
# File 'lib/pitchfork/http_server.rb', line 80

def soft_timeout
  @soft_timeout
end

#spawn_timeoutObject

:stopdoc:



80
81
82
# File 'lib/pitchfork/http_server.rb', line 80

def spawn_timeout
  @spawn_timeout
end

#timeoutObject

:stopdoc:



80
81
82
# File 'lib/pitchfork/http_server.rb', line 80

def timeout
  @timeout
end

#timeout_signalObject

:stopdoc:



80
81
82
# File 'lib/pitchfork/http_server.rb', line 80

def timeout_signal
  @timeout_signal
end

#worker_processesObject

:stopdoc:



80
81
82
# File 'lib/pitchfork/http_server.rb', line 80

def worker_processes
  @worker_processes
end

Instance Method Details

#check_client_connectionObject



460
461
462
# File 'lib/pitchfork/http_server.rb', line 460

def check_client_connection
  Pitchfork::HttpParser.check_client_connection
end

#check_client_connection=(bool) ⇒ Object



464
465
466
# File 'lib/pitchfork/http_server.rb', line 464

def check_client_connection=(bool)
  Pitchfork::HttpParser.check_client_connection = bool
end

#client_body_buffer_sizeObject



452
453
454
# File 'lib/pitchfork/http_server.rb', line 452

def client_body_buffer_size
  Pitchfork::TeeInput.client_body_buffer_size
end

#client_body_buffer_size=(bytes) ⇒ Object



456
457
458
# File 'lib/pitchfork/http_server.rb', line 456

def client_body_buffer_size=(bytes)
  Pitchfork::TeeInput.client_body_buffer_size = bytes
end

#joinObject

monitors children and receives signals forever (or until a termination signal is sent). This handles signals one-at-a-time time and we’ll happily drop signals in case somebody is signalling us too often.



289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
# File 'lib/pitchfork/http_server.rb', line 289

def join
  @respawn = true

  proc_name role: 'monitor', status: ARGV.join(' ')

  logger.info "master process ready" # test_exec.rb relies on this message
  if @ready_pipe
    begin
      @ready_pipe.syswrite($$.to_s)
    rescue => e
      logger.warn("grandparent died too soon?: #{e.message} (#{e.class})")
    end
    @ready_pipe = @ready_pipe.close rescue nil
  end
  while true
    begin
      if monitor_loop == StopIteration
        break
      end
    rescue => e
      Pitchfork.log_error(@logger, "master loop error", e)
    end
  end
  stop # gracefully shutdown all workers on our way out
  logger.info "master complete status=#{@exit_status}"
  @exit_status
end

#listen(address, opt = listener_opts[address] || {}) ⇒ Object

add a given address to the listeners set, idempotently Allows workers to add a private, per-process listener via the after_worker_fork hook. Very useful for debugging and testing. :tries may be specified as an option for the number of times to retry, and :delay may be specified as the time in seconds to delay between retries. A negative value for :tries indicates the listen will be retried indefinitely, this is useful when workers belonging to different masters are spawned during a transparent upgrade.



228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
# File 'lib/pitchfork/http_server.rb', line 228

def listen(address, opt = listener_opts[address] || {})
  address = config.expand_addr(address)
  return if String === address && listener_names.include?(address)

  opt = opt.dup
  delay = opt[:delay] || 0.5
  tries = opt[:tries] || 5
  queues = opt[:queues] ||= 1
  opt[:reuseport] = true if queues > 1

  begin
    io = bind_listen(address, opt)
    unless TCPServer === io || UNIXServer === io
      io.autoclose = false
      io = server_cast(io)
    end
    logger.info "listening on addr=#{sock_name(io)} fd=#{io.fileno}"
    Info.keep_io(io)
    LISTENERS << io unless queues > 1
    io
  rescue Errno::EADDRINUSE => err
    logger.error "adding listener failed addr=#{address} (in use)"
    raise err if tries == 0
    tries -= 1
    logger.error "retrying in #{delay} seconds " \
                 "(#{tries < 0 ? 'infinite' : tries} tries left)"
    sleep(delay)
    retry
  rescue => err
    logger.fatal "error adding listener addr=#{address}"
    raise err
  end

  if queues > 1
    ios = [io]

    (queues - 1).times do
      io = bind_listen(address, opt)
      unless TCPServer === io || UNIXServer === io
        io.autoclose = false
        io = server_cast(io)
      end
      logger.info "listening on addr=#{sock_name(io)} fd=#{io.fileno} (SO_REUSEPORT)"
      Info.keep_io(io)
      ios << io
    rescue => err
      logger.fatal "error adding listener addr=#{address}"
      raise err
    end

    io = Listeners::Group.new(ios, queues_per_worker: opt[:queues_per_worker] || queues - 1)
    LISTENERS << io
  end

  io
end

#listeners=(listeners) ⇒ Object

replaces current listener set with listeners. This will close the socket if it will not exist in the new listener set



198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
# File 'lib/pitchfork/http_server.rb', line 198

def listeners=(listeners)
  unless LISTENERS.empty?
    raise "Listeners can only be initialized once"
  end

  cur_names, dead_names = [], []
  listener_names.each do |name|
    if name.start_with?('/')
      # mark unlinked sockets as dead so we can rebind them
      (File.socket?(name) ? cur_names : dead_names) << name
    else
      cur_names << name
    end
  end
  listener_names(listeners).each { |addr| listen(addr) }
end

#monitor_loop(sleep = true) ⇒ Object



317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
# File 'lib/pitchfork/http_server.rb', line 317

def monitor_loop(sleep = true)
  reap_all_workers

  if REFORKING_AVAILABLE && @respawn && @children.molds.empty?
    logger.info("No mold alive, shutting down")
    @exit_status = 1
    @sig_queue << :TERM
    @respawn = false
  end

  case message = @sig_queue.shift
  when nil
    # avoid murdering workers after our master process (or the
    # machine) comes out of suspend/hibernation
    if (@last_check + @timeout) >= (@last_check = Pitchfork.time_now)
      sleep_time = murder_lazy_workers
    else
      sleep_time = @timeout/2.0 + 1
      @logger.debug("waiting #{sleep_time}s after suspend/hibernation")
    end
    if @respawn
      maintain_worker_count
      restart_outdated_workers if REFORKING_AVAILABLE
    end

    master_sleep(sleep_time) if sleep
  when :QUIT, :TERM # graceful shutdown
    SharedMemory.shutting_down!
    logger.info "#{message} received, starting graceful shutdown"
    return StopIteration
  when :INT # immediate shutdown
    SharedMemory.shutting_down!
    logger.info "#{message} received, starting immediate shutdown"
    stop(false)
    return StopIteration
  when :USR2 # trigger a promotion
    if @respawn
      trigger_refork
    else
      logger.error "Can't trigger a refork as the server is shutting down"
    end
  when :TTIN
    @respawn = true
    self.worker_processes += 1
  when :TTOU
    self.worker_processes -= 1 if self.worker_processes > 0
  when Message::WorkerSpawned
    worker = @children.update(message)
    # TODO: should we send a message to the worker to acknowledge?
    logger.info "worker=#{worker.nr} pid=#{worker.pid} gen=#{worker.generation} registered"
  when Message::MoldSpawned
    new_mold = @children.update(message)
    logger.info("mold pid=#{new_mold.pid} gen=#{new_mold.generation} spawned")
  when Message::ServiceSpawned
    new_service = @children.update(message)
    logger.info("service pid=#{new_service.pid} gen=#{new_service.generation} spawned")
  when Message::MoldReady
    old_molds = @children.molds
    new_mold = @children.update(message)
    logger.info("mold pid=#{new_mold.pid} gen=#{new_mold.generation} ready")
    old_molds.each do |old_mold|
      logger.info("Terminating old mold pid=#{old_mold.pid} gen=#{old_mold.generation}")
      old_mold.soft_kill(:TERM)
    end
  else
    logger.error("Unexpected message in sig_queue #{message.inspect}")
    logger.error(@sig_queue.inspect)
  end
end

#rewindable_inputObject



443
444
445
# File 'lib/pitchfork/http_server.rb', line 443

def rewindable_input
  Pitchfork::HttpParser.input_class.method_defined?(:rewind)
end

#rewindable_input=(bool) ⇒ Object



447
448
449
450
# File 'lib/pitchfork/http_server.rb', line 447

def rewindable_input=(bool)
  Pitchfork::HttpParser.input_class = bool ?
                              Pitchfork::TeeInput : Pitchfork::StreamInput
end

#service_exit(service) ⇒ Object



429
430
431
432
433
434
435
436
437
438
439
440
441
# File 'lib/pitchfork/http_server.rb', line 429

def service_exit(service)
  logger.info "service pid=#{service.pid} gen=#{service.generation} exiting"
  proc_name status: "exiting"

  if @before_service_worker_exit
    begin
      @before_service_worker_exit.call(self, service)
    rescue => error
      Pitchfork.log_error(logger, "before_service_worker_exit error", error)
    end
  end
  Process.exit
end

#start(sync = true) ⇒ Object

Runs the thing. Returns self so you can run join on it



154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
# File 'lib/pitchfork/http_server.rb', line 154

def start(sync = true)
  Pitchfork.enable_child_subreaper # noop if not supported

  # This socketpair is used to wake us up from select(2) in #join when signals
  # are trapped.  See trap_deferred.
  # It's also used by newly spawned children to send their soft_signal pipe
  # to the master when they are spawned.
  @control_socket.replace(Pitchfork.socketpair)
  Info.keep_ios(@control_socket)
  @master_pid = $$

  # setup signal handlers before writing pid file in case people get
  # trigger happy and send signals as soon as the pid file exists.
  # Note that signals don't actually get handled until the #join method
  @queue_sigs.each { |sig| trap(sig) { @sig_queue << sig; awaken_master } }
  trap(:CHLD) { awaken_master }

  if REFORKING_AVAILABLE
    spawn_initial_mold
    wait_for_pending_workers
    unless @children.mold
      raise BootFailure, "The initial mold failed to boot"
    end
  else
    build_app!
    bind_listeners!
    after_mold_fork.call(self, Worker.new(nil, pid: $$).promoted!(@spawn_timeout))
  end

  if sync
    spawn_missing_workers
    # We could just return here as we'd register them later in #join.
    # However a good part of the test suite assumes #start only return
    # once all initial workers are spawned.
    wait_for_pending_workers
  end

  @after_monitor_ready&.call(self)

  self
end

#stop(graceful = true) ⇒ Object

Terminates all workers, but does not exit master process



388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
# File 'lib/pitchfork/http_server.rb', line 388

def stop(graceful = true)
  proc_name role: 'monitor', status: 'shutting down'
  @respawn = false
  SharedMemory.shutting_down!
  wait_for_pending_workers
  LISTENERS.each(&:close).clear

  limit = Pitchfork.time_now + timeout
  until @children.empty? || Pitchfork.time_now > limit
    if graceful
      @children.soft_kill_all(:TERM)
    else
      @children.hard_kill_all(:INT)
    end
    if monitor_loop(false) == StopIteration
      return StopIteration
    end
  end

  @children.each do |child|
    if child.pid
      @children.hard_kill(@timeout_signal.call(child.pid), child)
    end
  end
  @promotion_lock.unlink
end

#worker_exit(worker) ⇒ Object



415
416
417
418
419
420
421
422
423
424
425
426
427
# File 'lib/pitchfork/http_server.rb', line 415

def worker_exit(worker)
  logger.info "worker=#{worker.nr} pid=#{worker.pid} gen=#{worker.generation} exiting"
  proc_name status: "exiting"

  if @before_worker_exit
    begin
      @before_worker_exit.call(self, worker)
    rescue => error
      Pitchfork.log_error(logger, "before_worker_exit error", error)
    end
  end
  Process.exit
end