Class: Puma::Cluster::Worker

Inherits:
Runner
  • Object
show all
Defined in:
lib/puma/cluster/worker.rb

Overview

This class is instantiated by the `Puma::Cluster` and represents a single worker process.

At the core of this class is running an instance of `Puma::Server` which gets created via the `start_server` method from the `Puma::Runner` class that this inherits from.

Instance Attribute Summary collapse

Attributes inherited from Runner

#app, #ruby_engine

Instance Method Summary collapse

Methods inherited from Runner

#close_control_listeners, #debug, #development?, #error, #load_and_bind, #log, #output_header, #redirect_io, #redirected_io?, #start_control, #start_server, #stop_control, #test?

Constructor Details

#initialize(index:, master:, launcher:, pipes:, server: nil) ⇒ Worker

Returns a new instance of Worker.


14
15
16
17
18
19
20
21
22
23
24
25
26
# File 'lib/puma/cluster/worker.rb', line 14

def initialize(index:, master:, launcher:, pipes:, server: nil)
  super launcher, launcher.events

  @index = index
  @master = master
  @launcher = launcher
  @options = launcher.options
  @check_pipe = pipes[:check_pipe]
  @worker_write = pipes[:worker_write]
  @fork_pipe = pipes[:fork_pipe]
  @wakeup = pipes[:wakeup]
  @server = server
end

Instance Attribute Details

#indexObject (readonly)

Returns the value of attribute index


12
13
14
# File 'lib/puma/cluster/worker.rb', line 12

def index
  @index
end

#masterObject (readonly)

Returns the value of attribute master


12
13
14
# File 'lib/puma/cluster/worker.rb', line 12

def master
  @master
end

Instance Method Details

#runObject


28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
# File 'lib/puma/cluster/worker.rb', line 28

def run
  title  = "puma: cluster worker #{index}: #{master}"
  title += " [#{@options[:tag]}]" if @options[:tag] && !@options[:tag].empty?
  $0 = title

  Signal.trap "SIGINT", "IGNORE"
  Signal.trap "SIGCHLD", "DEFAULT"

  Thread.new do
    Puma.set_thread_name "worker check pipe"
    IO.select [@check_pipe]
    log "! Detected parent died, dying"
    exit! 1
  end

  # If we're not running under a Bundler context, then
  # report the info about the context we will be using
  if !ENV['BUNDLE_GEMFILE']
    if File.exist?("Gemfile")
      log "+ Gemfile in context: #{File.expand_path("Gemfile")}"
    elsif File.exist?("gems.rb")
      log "+ Gemfile in context: #{File.expand_path("gems.rb")}"
    end
  end

  # Invoke any worker boot hooks so they can get
  # things in shape before booting the app.
  @launcher.config.run_hooks :before_worker_boot, index, @launcher.events

  server = @server ||= start_server
  restart_server = Queue.new << true << false

  fork_worker = @options[:fork_worker] && index == 0

  if fork_worker
    restart_server.clear
    worker_pids = []
    Signal.trap "SIGCHLD" do
      wakeup! if worker_pids.reject! do |p|
        Process.wait(p, Process::WNOHANG) rescue true
      end
    end

    Thread.new do
      Puma.set_thread_name "worker fork pipe"
      while (idx = @fork_pipe.gets)
        idx = idx.to_i
        if idx == -1 # stop server
          if restart_server.length > 0
            restart_server.clear
            server.begin_restart(true)
            @launcher.config.run_hooks :before_refork, nil, @launcher.events
            Puma::Util.nakayoshi_gc @events if @options[:nakayoshi_fork]
          end
        elsif idx == 0 # restart server
          restart_server << true << false
        else # fork worker
          worker_pids << pid = spawn_worker(idx)
          @worker_write << "f#{pid}:#{idx}\n" rescue nil
        end
      end
    end
  end

  Signal.trap "SIGTERM" do
    @worker_write << "e#{Process.pid}\n" rescue nil
    restart_server.clear
    server.stop
    restart_server << false
  end

  begin
    @worker_write << "b#{Process.pid}:#{index}\n"
  rescue SystemCallError, IOError
    Thread.current.purge_interrupt_queue if Thread.current.respond_to? :purge_interrupt_queue
    STDERR.puts "Master seems to have exited, exiting."
    return
  end

  while restart_server.pop
    server_thread = server.run
    stat_thread ||= Thread.new(@worker_write) do |io|
      Puma.set_thread_name "stat payload"
      base_payload = "p#{Process.pid}"

      while true
        begin
          b = server.backlog || 0
          r = server.running || 0
          t = server.pool_capacity || 0
          m = server.max_threads || 0
          rc = server.requests_count || 0
          payload = %Q!#{base_payload}{ "backlog":#{b}, "running":#{r}, "pool_capacity":#{t}, "max_threads": #{m}, "requests_count": #{rc} }\n!
          io << payload
        rescue IOError
          Thread.current.purge_interrupt_queue if Thread.current.respond_to? :purge_interrupt_queue
          break
        end
        sleep Const::WORKER_CHECK_INTERVAL
      end
    end
    server_thread.join
  end

  # Invoke any worker shutdown hooks so they can prevent the worker
  # exiting until any background operations are completed
  @launcher.config.run_hooks :before_worker_shutdown, index, @launcher.events
ensure
  @worker_write << "t#{Process.pid}\n" rescue nil
  @worker_write.close
end