Class: Preforker
- Inherits:
-
Object
show all
- Defined in:
- lib/preforker.rb,
lib/preforker/util.rb,
lib/preforker/worker.rb,
lib/preforker/pid_manager.rb,
lib/preforker/signal_processor.rb
Defined Under Namespace
Classes: PidManager, SignalProcessor, TmpIO, Util, Worker
Instance Attribute Summary collapse
Instance Method Summary
collapse
Constructor Details
#initialize(options = {}, &worker_block) ⇒ Preforker
Returns a new instance of Preforker.
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
|
# File 'lib/preforker.rb', line 12
def initialize(options = {}, &worker_block)
@app_name = options[:app_name] || "Preforker"
default_log_file = "#{@app_name.downcase}.log"
@options = {
:timeout => 5,
:workers => 10,
:app_name => "Preforker",
:stderr_path => default_log_file,
:stderr_path => default_log_file
}.merge(options)
@logger = @options[:logger] || Logger.new(default_log_file)
@timeout = @options[:timeout]
@number_of_workers = @options[:workers]
@worker_block = worker_block || lambda {}
@workers = {}
$0 = "#@app_name Master"
end
|
Instance Attribute Details
#app_name ⇒ Object
Returns the value of attribute app_name.
9
10
11
|
# File 'lib/preforker.rb', line 9
def app_name
@app_name
end
|
#logger ⇒ Object
Returns the value of attribute logger.
9
10
11
|
# File 'lib/preforker.rb', line 9
def logger
@logger
end
|
#number_of_workers ⇒ Object
Returns the value of attribute number_of_workers.
10
11
12
|
# File 'lib/preforker.rb', line 10
def number_of_workers
@number_of_workers
end
|
#timeout ⇒ Object
Returns the value of attribute timeout.
9
10
11
|
# File 'lib/preforker.rb', line 9
def timeout
@timeout
end
|
Instance Method Details
#close_resources_worker_wont_use ⇒ Object
89
90
91
92
93
|
# File 'lib/preforker.rb', line 89
def close_resources_worker_wont_use
@signal_processor.reset
@workers.values.each { |other| other.tmp.close rescue nil }
@workers.clear
end
|
#launch(&block) ⇒ Object
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
|
# File 'lib/preforker.rb', line 63
def launch(&block)
puts "Starting server"
ready_read, ready_write = IO.pipe
[ready_read, ready_write].each { |io| io.fcntl(Fcntl::F_SETFD, Fcntl::FD_CLOEXEC) }
fork do
ready_read.close
Process.setsid
fork do
block.call(ready_write)
end
end
ready_write.close
master_pid = (ready_read.readpartial(16) rescue nil).to_i
ready_read.close
if master_pid <= 1
warn "Master failed to start, check stderr log for details"
exit!(1)
else
puts "Server started successfuly"
end
end
|
#maintain_worker_count ⇒ Object
137
138
139
140
141
142
143
144
|
# File 'lib/preforker.rb', line 137
def maintain_worker_count
number_of_missing_workers = @number_of_workers - @workers.size
return if number_of_missing_workers == 0
return spawn_missing_workers(number_of_missing_workers) if number_of_missing_workers > 0
@workers.values[0..(-number_of_missing_workers - 1)].each do |unneeded_worker|
signal_worker(:QUIT, unneeded_worker.pid) rescue nil
end
end
|
#murder_lazy_workers ⇒ Object
forcibly terminate all workers that haven’t checked in in timeout seconds. The timeout is implemented using an unlinked File shared between the parent process and each worker. The worker runs File#chmod to modify the ctime of the File. If the ctime is stale for >timeout seconds, then we’ll kill the corresponding worker.
152
153
154
155
156
157
158
159
160
161
|
# File 'lib/preforker.rb', line 152
def murder_lazy_workers
@workers.dup.each_pair do |worker_pid, worker|
stat = worker.tmp.stat
next if stat.mode == 0100600
next if (diff = (Time.now - stat.ctime)) <= @timeout
logger.error "Worker=#{worker_pid} timeout (#{diff}s > #{@timeout}s), killing"
signal_worker(:KILL, worker_pid) end
end
|
#quit(graceful = true) ⇒ Object
106
107
108
109
|
# File 'lib/preforker.rb', line 106
def quit(graceful = true)
stop(graceful)
@pid_manager.unlink
end
|
#reap_all_workers ⇒ Object
111
112
113
114
115
116
117
118
119
120
121
|
# File 'lib/preforker.rb', line 111
def reap_all_workers
begin
loop do
worker_pid, status = Process.waitpid2(-1, Process::WNOHANG)
break unless worker_pid
worker = @workers.delete(worker_pid) and worker.tmp.close rescue nil
logger.info "reaped #{status.inspect}"
end
rescue Errno::ECHILD
end
end
|
#redirect_io(io, path) ⇒ Object
190
191
192
193
|
# File 'lib/preforker.rb', line 190
def redirect_io(io, path)
File.open(path, 'ab') { |fp| io.reopen(fp) } if path
io.sync = true
end
|
#run(ready_write = nil) ⇒ Object
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
|
# File 'lib/preforker.rb', line 33
def run(ready_write = nil)
$stdin.reopen("/dev/null")
set_stdout_path(@options[:stdout_path])
set_stderr_path(@options[:stderr_path])
logger.info "Master started"
pid_path = @options[:pid_path] || "./#{@app_name.downcase}.pid"
@pid_manager = PidManager.new(pid_path)
@signal_processor = SignalProcessor.new(self)
spawn_missing_workers do
ready_write.close if ready_write
end
if ready_write
ready_write.syswrite($$.to_s)
ready_write.close rescue nil
end
@signal_processor.start_signal_loop
end
|
#set_stderr_path(path) ⇒ Object
186
187
188
|
# File 'lib/preforker.rb', line 186
def set_stderr_path(path)
redirect_io($stderr, path)
end
|
#set_stdout_path(path) ⇒ Object
182
183
184
|
# File 'lib/preforker.rb', line 182
def set_stdout_path(path)
redirect_io($stdout, path)
end
|
#signal_each_worker(signal) ⇒ Object
delivers a signal to each worker
174
175
176
|
# File 'lib/preforker.rb', line 174
def signal_each_worker(signal)
@workers.keys.each { |worker_pid| signal_worker(signal, worker_pid) }
end
|
#signal_quit ⇒ Object
178
179
180
|
# File 'lib/preforker.rb', line 178
def signal_quit
signal_worker(:QUIT, @pid_manager.pid)
end
|
#signal_worker(signal, worker_pid) ⇒ Object
delivers a signal to a worker and fails gracefully if the worker is no longer running.
165
166
167
168
169
170
171
|
# File 'lib/preforker.rb', line 165
def signal_worker(signal, worker_pid)
begin
Process.kill(signal, worker_pid)
rescue Errno::ESRCH
worker = @workers.delete(worker_pid) and worker.tmp.close rescue nil
end
end
|
#spawn_missing_workers(new_workers_count = @number_of_workers, &init_block) ⇒ Object
123
124
125
126
127
128
129
130
131
132
133
134
135
|
# File 'lib/preforker.rb', line 123
def spawn_missing_workers(new_workers_count = @number_of_workers, &init_block)
new_workers_count.times do
worker = Worker.new(@worker_block, self)
worker_pid = fork do
close_resources_worker_wont_use
init_block.call if init_block
worker.work
end
worker.pid = worker_pid
@workers[worker_pid] = worker
end
end
|
#start ⇒ Object
57
58
59
60
61
|
# File 'lib/preforker.rb', line 57
def start
launch do |ready_write|
run(ready_write)
end
end
|
#stop(graceful = true) ⇒ Object
Terminates all workers, but does not exit master process
96
97
98
99
100
101
102
103
104
|
# File 'lib/preforker.rb', line 96
def stop(graceful = true)
limit = Time.now + @timeout
until @workers.empty? || Time.now > limit
signal_each_worker(graceful ? :QUIT : :TERM)
sleep(0.1)
reap_all_workers
end
signal_each_worker(:KILL)
end
|