Class: ServerEngine::ProcessManager

Inherits:
Object
  • Object
show all
Defined in:
lib/serverengine/process_manager.rb

Defined Under Namespace

Classes: AlreadyClosedError, HeartbeatThread, Monitor, Target, TickThread

Constant Summary collapse

CONFIG_PARAMS =
{
  heartbeat_interval: 1,
  heartbeat_timeout: 180,
  graceful_kill_interval: 15,
  graceful_kill_interval_increment: 10,
  graceful_kill_timeout: 600,
  immediate_kill_interval: 10,
  immediate_kill_interval_increment: 10,
  immediate_kill_timeout: 600,
}
HEARTBEAT_MESSAGE =
[0].pack('C')

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(config = {}) ⇒ ProcessManager

Returns a new instance of ProcessManager.



24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
# File 'lib/serverengine/process_manager.rb', line 24

def initialize(config={})
  @monitors = []
  @rpipes = {}
  @heartbeat_time = {}

  @cloexec_mode = config[:cloexec_mode]

  @graceful_kill_signal = config[:graceful_kill_signal] || :TERM
  @immediate_kill_signal = config[:immediate_kill_signal] || :QUIT

  @auto_tick = !!config.fetch(:auto_tick, true)
  @auto_tick_interval = config[:auto_tick_interval] || 1

  @enable_heartbeat = !!config[:enable_heartbeat]
  if ServerEngine.windows?
    # heartbeat is not supported on Windows platform. See also spawn method.
    @enable_heartbeat = false
  end
  @auto_heartbeat = !!config.fetch(:auto_heartbeat, true)

  case op = config[:on_heartbeat_error]
  when nil
    @heartbeat_error_proc = lambda {|t| }
  when Proc
    @heartbeat_error_proc = op
  when :abort
    @heartbeat_error_proc = lambda {|t| exit 1 }
  else
    raise ArgumentError, "unexpected :on_heartbeat_error option (expected Proc, true or false but got #{op.class})"
  end

  configure(config)

  @closed = false
  @read_buffer = ''

  if @auto_tick
    TickThread.new(@auto_tick_interval, &method(:tick))
  end
end

Instance Attribute Details

#auto_heartbeatObject (readonly)

Returns the value of attribute auto_heartbeat.



71
72
73
# File 'lib/serverengine/process_manager.rb', line 71

def auto_heartbeat
  @auto_heartbeat
end

#auto_tickObject (readonly)

Returns the value of attribute auto_tick.



70
71
72
# File 'lib/serverengine/process_manager.rb', line 70

def auto_tick
  @auto_tick
end

#auto_tick_intervalObject (readonly)

Returns the value of attribute auto_tick_interval.



70
71
72
# File 'lib/serverengine/process_manager.rb', line 70

def auto_tick_interval
  @auto_tick_interval
end

#cloexec_modeObject

Returns the value of attribute cloexec_mode.



67
68
69
# File 'lib/serverengine/process_manager.rb', line 67

def cloexec_mode
  @cloexec_mode
end

#command_senderObject

Returns the value of attribute command_sender.



73
74
75
# File 'lib/serverengine/process_manager.rb', line 73

def command_sender
  @command_sender
end

#enable_heartbeatObject (readonly)

Returns the value of attribute enable_heartbeat.



71
72
73
# File 'lib/serverengine/process_manager.rb', line 71

def enable_heartbeat
  @enable_heartbeat
end

#graceful_kill_signalObject (readonly)

Returns the value of attribute graceful_kill_signal.



69
70
71
# File 'lib/serverengine/process_manager.rb', line 69

def graceful_kill_signal
  @graceful_kill_signal
end

#immediate_kill_signalObject (readonly)

Returns the value of attribute immediate_kill_signal.



69
70
71
# File 'lib/serverengine/process_manager.rb', line 69

def immediate_kill_signal
  @immediate_kill_signal
end

#loggerObject

Returns the value of attribute logger.



65
66
67
# File 'lib/serverengine/process_manager.rb', line 65

def logger
  @logger
end

Instance Method Details

#closeObject



234
235
236
237
238
# File 'lib/serverengine/process_manager.rb', line 234

def close
  @closed = true
  @rpipes.keys.each {|m| m.close }
  nil
end

#configure(config, opts = {}) ⇒ Object



95
96
97
98
99
100
# File 'lib/serverengine/process_manager.rb', line 95

def configure(config, opts={})
  prefix = opts[:prefix] || ""
  CONFIG_PARAMS.keys.each {|key|
    send("#{key}=", config[:"#{prefix}#{key}"])
  }
end

#fork(&block) ⇒ Object



117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
# File 'lib/serverengine/process_manager.rb', line 117

def fork(&block)
  if ServerEngine.windows?
    raise NotImplementedError, "fork is not available on this platform. Please use spawn (worker_type: 'spawn')."
  end

  rpipe, wpipe = new_pipe_pair

  begin
    pid = Process.fork do
      self.close
      begin
        t = Target.new(wpipe)
        if @enable_heartbeat && @auto_heartbeat
          HeartbeatThread.new(@heartbeat_interval, t, @heartbeat_error_proc)
        end

        block.call(t)
        exit! 0

      rescue
        ServerEngine.dump_uncaught_error($!)
      ensure
        exit! 1
      end
    end

    m = Monitor.new(pid, monitor_options)

    @monitors << m
    @rpipes[rpipe] = m
    rpipe = nil

    return m

  ensure
    wpipe.close
    rpipe.close if rpipe
  end
end

#monitor_optionsObject



102
103
104
105
106
107
108
109
110
111
112
113
114
115
# File 'lib/serverengine/process_manager.rb', line 102

def monitor_options
  {
    enable_heartbeat: @enable_heartbeat,
    heartbeat_timeout: @heartbeat_timeout,
    graceful_kill_signal: @graceful_kill_signal,
    graceful_kill_timeout: @graceful_kill_timeout,
    graceful_kill_interval: @graceful_kill_interval,
    graceful_kill_interval_increment: @graceful_kill_interval_increment,
    immediate_kill_signal: @immediate_kill_signal,
    immediate_kill_timeout: @immediate_kill_timeout,
    immediate_kill_interval: @immediate_kill_interval,
    immediate_kill_interval_increment: @immediate_kill_interval_increment,
  }
end

#new_pipe_pairObject



213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
# File 'lib/serverengine/process_manager.rb', line 213

def new_pipe_pair
  rpipe, wpipe = IO.pipe

  if Fcntl.const_defined?(:F_SETFD) && Fcntl.const_defined?(:FD_CLOEXEC)
    case @cloexec_mode
    when :target_only
      wpipe.fcntl(Fcntl::F_SETFD, Fcntl::FD_CLOEXEC)
    when :monitor_only
      rpipe.fcntl(Fcntl::F_SETFD, Fcntl::FD_CLOEXEC)
    else
      rpipe.fcntl(Fcntl::F_SETFD, Fcntl::FD_CLOEXEC)
      wpipe.fcntl(Fcntl::F_SETFD, Fcntl::FD_CLOEXEC)
    end
  end

  rpipe.sync = true
  wpipe.sync = true

  return rpipe, wpipe
end

#spawn(*args) ⇒ Object



157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
# File 'lib/serverengine/process_manager.rb', line 157

def spawn(*args)
  if args.first.is_a?(Hash)
    env = args.shift.dup
  else
    env = {}
  end

  if args.last.is_a?(Hash)
    options = args.pop.dup
  else
    options = {}
  end

  # pipe is necessary even if @enable_heartbeat == false because
  # parent process detects shutdown of a child process using it
  begin
    unless ServerEngine.windows?
      # heartbeat is not supported on Windows platform
      rpipe, wpipe = new_pipe_pair
      options[[wpipe.fileno]] = wpipe
      if @enable_heartbeat
        env['SERVERENGINE_HEARTBEAT_PIPE'] = wpipe.fileno.to_s
      end
    end

    command_sender_pipe = nil
    if @command_sender == "pipe"
      inpipe, command_sender_pipe = IO.pipe
      command_sender_pipe.sync = true
      command_sender_pipe.binmode
      options[:in] = inpipe
    end
    env['SERVERENGINE_SOCKETMANAGER_INTERNAL_TOKEN'] = SocketManager::INTERNAL_TOKEN
    pid = Process.spawn(env, *args, options)
    if @command_sender == "pipe"
      inpipe.close
    end

    m = Monitor.new(pid, monitor_options)
    m.command_sender_pipe = command_sender_pipe

    @monitors << m

    unless ServerEngine.windows?
      @rpipes[rpipe] = m
      rpipe = nil
    end

    return m

  ensure
    wpipe.close if wpipe
    rpipe.close if rpipe
  end
end

#tick(blocking_timeout = 0) ⇒ Object



240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
# File 'lib/serverengine/process_manager.rb', line 240

def tick(blocking_timeout=0)
  if @closed
    raise AlreadyClosedError.new
  end

  if @rpipes.empty?
    sleep blocking_timeout if blocking_timeout > 0
    return nil
  end

  if ServerEngine.windows?
    raise "heartbeat is not supported on Windows platform. @rpipes must be empty."
  end

  time = Time.now
  ready_pipes, _, _ = IO.select(@rpipes.keys, nil, nil, blocking_timeout)

  if ready_pipes
    ready_pipes.each do |r|
      begin
        r.read_nonblock(1024, @read_buffer)
      rescue Errno::EAGAIN, Errno::EINTR
        next
      rescue #EOFError
        m = @rpipes.delete(r)
        m.start_immediate_stop!
        r.close rescue nil
        next
      end

      if m = @rpipes[r]
        m.last_heartbeat_time = time
      end
    end
  end

  @monitors.delete_if {|m|
    !m.tick(time)
  }

  nil
end