Class: Puma::Cluster::Worker

Inherits:
Runner
  • Object
show all
Defined in:
lib/puma/cluster/worker.rb

Overview

This class is instantiated by the ‘Puma::Cluster` and represents a single worker process.

At the core of this class is running an instance of ‘Puma::Server` which gets created via the `start_server` method from the `Puma::Runner` class that this inherits from.

Instance Attribute Summary collapse

Attributes inherited from Runner

#app, #options, #ruby_engine

Instance Method Summary collapse

Methods inherited from Runner

#close_control_listeners, #debug, #development?, #error, #load_and_bind, #log, #output_header, #redirect_io, #redirected_io?, #start_control, #start_server, #stop_control, #test?, #wakeup!

Constructor Details

#initialize(index:, master:, launcher:, pipes:, server: nil) ⇒ Worker

Returns a new instance of Worker.



17
18
19
20
21
22
23
24
25
26
27
28
# File 'lib/puma/cluster/worker.rb', line 17

def initialize(index:, master:, launcher:, pipes:, server: nil)
  super(launcher)

  @index = index
  @master = master
  @check_pipe = pipes[:check_pipe]
  @worker_write = pipes[:worker_write]
  @fork_pipe = pipes[:fork_pipe]
  @wakeup = pipes[:wakeup]
  @server = server
  @hook_data = {}
end

Instance Attribute Details

#indexObject (readonly)

:nodoc:



15
16
17
# File 'lib/puma/cluster/worker.rb', line 15

def index
  @index
end

#masterObject (readonly)

:nodoc:



15
16
17
# File 'lib/puma/cluster/worker.rb', line 15

def master
  @master
end

Instance Method Details

#runObject



30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
# File 'lib/puma/cluster/worker.rb', line 30

def run
  title  = "puma: cluster worker #{index}: #{master}"
  title += " [#{@options[:tag]}]" if @options[:tag] && !@options[:tag].empty?
  $0 = title

  Signal.trap "SIGINT", "IGNORE"
  Signal.trap "SIGCHLD", "DEFAULT"

  Thread.new do
    Puma.set_thread_name "wrkr check"
    @check_pipe.wait_readable
    log "! Detected parent died, dying"
    exit! 1
  end

  # If we're not running under a Bundler context, then
  # report the info about the context we will be using
  if !ENV['BUNDLE_GEMFILE']
    if File.exist?("Gemfile")
      log "+ Gemfile in context: #{File.expand_path("Gemfile")}"
    elsif File.exist?("gems.rb")
      log "+ Gemfile in context: #{File.expand_path("gems.rb")}"
    end
  end

  # Invoke any worker boot hooks so they can get
  # things in shape before booting the app.
  @config.run_hooks(:before_worker_boot, index, @log_writer, @hook_data)

  begin
  server = @server ||= start_server
  rescue Exception => e
    log "! Unable to start worker"
    log e
    log e.backtrace.join("\n    ")
    exit 1
  end

  restart_server = Queue.new << true << false

  fork_worker = @options[:fork_worker] && index == 0

  if fork_worker
    restart_server.clear
    worker_pids = []
    Signal.trap "SIGCHLD" do
      wakeup! if worker_pids.reject! do |p|
        Process.wait(p, Process::WNOHANG) rescue true
      end
    end

    Thread.new do
      Puma.set_thread_name "wrkr fork"
      while (idx = @fork_pipe.gets)
        idx = idx.to_i
        if idx == -1 # stop server
          if restart_server.length > 0
            restart_server.clear
            server.begin_restart(true)
            @config.run_hooks(:before_refork, nil, @log_writer, @hook_data)
          end
        elsif idx == 0 # restart server
          restart_server << true << false
        else # fork worker
          worker_pids << pid = spawn_worker(idx)
          @worker_write << "f#{pid}:#{idx}\n" rescue nil
        end
      end
    end
  end

  Signal.trap "SIGTERM" do
    @worker_write << "e#{Process.pid}\n" rescue nil
    restart_server.clear
    server.stop
    restart_server << false
  end

  begin
    @worker_write << "b#{Process.pid}:#{index}\n"
  rescue SystemCallError, IOError
    Puma::Util.purge_interrupt_queue
    STDERR.puts "Master seems to have exited, exiting."
    return
  end

  while restart_server.pop
    server_thread = server.run

    if @log_writer.debug? && index == 0
      debug_loaded_extensions "Loaded Extensions - worker 0:"
    end

    stat_thread ||= Thread.new(@worker_write) do |io|
      Puma.set_thread_name "stat pld"
      base_payload = "p#{Process.pid}"

      while true
        begin
          b = server.backlog || 0
          r = server.running || 0
          t = server.pool_capacity || 0
          m = server.max_threads || 0
          rc = server.requests_count || 0
          payload = %Q!#{base_payload}{ "backlog":#{b}, "running":#{r}, "pool_capacity":#{t}, "max_threads": #{m}, "requests_count": #{rc} }\n!
          io << payload
        rescue IOError
          Puma::Util.purge_interrupt_queue
          break
        end
        sleep @options[:worker_check_interval]
      end
    end
    server_thread.join
  end

  # Invoke any worker shutdown hooks so they can prevent the worker
  # exiting until any background operations are completed
  @config.run_hooks(:before_worker_shutdown, index, @log_writer, @hook_data)

  Process.kill "SIGTERM", master if server.idle_timeout_reached
ensure
  @worker_write << "t#{Process.pid}\n" rescue nil
  @worker_write.close
end