Class: Preforker
- Inherits:
-
Object
show all
- Defined in:
- lib/preforker.rb,
lib/preforker/util.rb,
lib/preforker/worker.rb,
lib/preforker/pid_manager.rb,
lib/preforker/signal_processor.rb
Defined Under Namespace
Classes: PidManager, SignalProcessor, TmpIO, Util, Worker
Instance Attribute Summary collapse
Instance Method Summary
collapse
Constructor Details
#initialize(options = {}, &worker_block) ⇒ Preforker
Returns a new instance of Preforker.
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
|
# File 'lib/preforker.rb', line 12
def initialize(options = {}, &worker_block)
@app_name = options[:app_name] || "Preforker"
default_log_file = "#{@app_name.downcase}.log"
@options = {
:timeout => 5,
:workers => 10,
:app_name => "Preforker",
:stderr_path => default_log_file,
:stderr_path => default_log_file
}.merge(options)
@logger = @options[:logger] || Logger.new(default_log_file)
@timeout = @options[:timeout]
@number_of_workers = @options[:workers]
@worker_block = worker_block || lambda {}
@workers = {}
$0 = "#@app_name Master"
end
|
Instance Attribute Details
#app_name ⇒ Object
Returns the value of attribute app_name.
9
10
11
|
# File 'lib/preforker.rb', line 9
def app_name
@app_name
end
|
#logger ⇒ Object
Returns the value of attribute logger.
9
10
11
|
# File 'lib/preforker.rb', line 9
def logger
@logger
end
|
#number_of_workers ⇒ Object
Returns the value of attribute number_of_workers.
10
11
12
|
# File 'lib/preforker.rb', line 10
def number_of_workers
@number_of_workers
end
|
#timeout ⇒ Object
Returns the value of attribute timeout.
9
10
11
|
# File 'lib/preforker.rb', line 9
def timeout
@timeout
end
|
Instance Method Details
#close_resources_worker_wont_use ⇒ Object
84
85
86
87
88
|
# File 'lib/preforker.rb', line 84
def close_resources_worker_wont_use
@signal_processor.reset
@workers.values.each { |other| other.tmp.close rescue nil }
@workers.clear
end
|
#launch(&block) ⇒ Object
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
|
# File 'lib/preforker.rb', line 57
def launch(&block)
puts "Starting server"
ready_read, ready_write = IO.pipe
[ready_read, ready_write].each { |io| io.fcntl(Fcntl::F_SETFD, Fcntl::FD_CLOEXEC) }
fork do
ready_read.close
Process.setsid
fork do
block.call(ready_write)
end
end
ready_write.close
master_pid = (ready_read.readpartial(16) rescue nil).to_i
ready_read.close
if master_pid <= 1
warn "Master failed to start, check stderr log for details"
exit!(1)
else
puts "Server started successfuly"
exit(0)
end
end
|
#maintain_worker_count ⇒ Object
132
133
134
135
136
137
138
139
|
# File 'lib/preforker.rb', line 132
def maintain_worker_count
number_of_missing_workers = @number_of_workers - @workers.size
return if number_of_missing_workers == 0
return spawn_missing_workers(number_of_missing_workers) if number_of_missing_workers > 0
@workers.values[0..(-number_of_missing_workers - 1)].each do |unneeded_worker|
signal_worker(:QUIT, unneeded_worker.pid) rescue nil
end
end
|
#murder_lazy_workers ⇒ Object
forcibly terminate all workers that haven’t checked in in timeout seconds. The timeout is implemented using an unlinked File shared between the parent process and each worker. The worker runs File#chmod to modify the ctime of the File. If the ctime is stale for >timeout seconds, then we’ll kill the corresponding worker.
147
148
149
150
151
152
153
154
155
156
|
# File 'lib/preforker.rb', line 147
def murder_lazy_workers
@workers.dup.each_pair do |worker_pid, worker|
stat = worker.tmp.stat
next if stat.mode == 0100600
next if (diff = (Time.now - stat.ctime)) <= @timeout
logger.error "Worker=#{worker_pid} timeout (#{diff}s > #{@timeout}s), killing"
signal_worker(:KILL, worker_pid) end
end
|
#quit(graceful = true) ⇒ Object
101
102
103
104
|
# File 'lib/preforker.rb', line 101
def quit(graceful = true)
stop(graceful)
@pid_manager.unlink
end
|
#reap_all_workers ⇒ Object
106
107
108
109
110
111
112
113
114
115
116
|
# File 'lib/preforker.rb', line 106
def reap_all_workers
begin
loop do
worker_pid, status = Process.waitpid2(-1, Process::WNOHANG)
break unless worker_pid
worker = @workers.delete(worker_pid) and worker.tmp.close rescue nil
logger.info "reaped #{status.inspect}"
end
rescue Errno::ECHILD
end
end
|
#redirect_io(io, path) ⇒ Object
185
186
187
188
|
# File 'lib/preforker.rb', line 185
def redirect_io(io, path)
File.open(path, 'ab') { |fp| io.reopen(fp) } if path
io.sync = true
end
|
#set_stderr_path(path) ⇒ Object
181
182
183
|
# File 'lib/preforker.rb', line 181
def set_stderr_path(path)
redirect_io($stderr, path)
end
|
#set_stdout_path(path) ⇒ Object
177
178
179
|
# File 'lib/preforker.rb', line 177
def set_stdout_path(path)
redirect_io($stdout, path)
end
|
#signal_each_worker(signal) ⇒ Object
delivers a signal to each worker
169
170
171
|
# File 'lib/preforker.rb', line 169
def signal_each_worker(signal)
@workers.keys.each { |worker_pid| signal_worker(signal, worker_pid) }
end
|
#signal_quit ⇒ Object
173
174
175
|
# File 'lib/preforker.rb', line 173
def signal_quit
signal_worker(:QUIT, @pid_manager.pid)
end
|
#signal_worker(signal, worker_pid) ⇒ Object
delivers a signal to a worker and fails gracefully if the worker is no longer running.
160
161
162
163
164
165
166
|
# File 'lib/preforker.rb', line 160
def signal_worker(signal, worker_pid)
begin
Process.kill(signal, worker_pid)
rescue Errno::ESRCH
worker = @workers.delete(worker_pid) and worker.tmp.close rescue nil
end
end
|
#spawn_missing_workers(new_workers_count = @number_of_workers, &init_block) ⇒ Object
118
119
120
121
122
123
124
125
126
127
128
129
130
|
# File 'lib/preforker.rb', line 118
def spawn_missing_workers(new_workers_count = @number_of_workers, &init_block)
new_workers_count.times do
worker = Worker.new(@worker_block, self)
worker_pid = fork do
close_resources_worker_wont_use
init_block.call if init_block
worker.work
end
worker.pid = worker_pid
@workers[worker_pid] = worker
end
end
|
#start ⇒ Object
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
|
# File 'lib/preforker.rb', line 33
def start
launch do |ready_write|
$stdin.reopen("/dev/null")
set_stdout_path(@options[:stdout_path])
set_stderr_path(@options[:stderr_path])
logger.info "Master started"
pid_path = @options[:pid_path] || "./#{@app_name.downcase}.pid"
@pid_manager = PidManager.new(pid_path)
@signal_processor = SignalProcessor.new(self)
spawn_missing_workers do
ready_write.close
end
ready_write.syswrite($$.to_s)
ready_write.close rescue nil
@signal_processor.start_signal_loop
end
end
|
#stop(graceful = true) ⇒ Object
Terminates all workers, but does not exit master process
91
92
93
94
95
96
97
98
99
|
# File 'lib/preforker.rb', line 91
def stop(graceful = true)
limit = Time.now + @timeout
until @workers.empty? || Time.now > limit
signal_each_worker(graceful ? :QUIT : :TERM)
sleep(0.1)
reap_all_workers
end
signal_each_worker(:KILL)
end
|