Class: Preforker

Inherits:
Object
  • Object
show all
Defined in:
lib/preforker.rb,
lib/preforker/util.rb,
lib/preforker/worker.rb,
lib/preforker/pid_manager.rb,
lib/preforker/signal_processor.rb

Defined Under Namespace

Classes: PidManager, SignalProcessor, TmpIO, Util, Worker

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(options = {}, &worker_block) ⇒ Preforker

Returns a new instance of Preforker.



12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
# File 'lib/preforker.rb', line 12

def initialize(options = {}, &worker_block)
  @app_name = options[:app_name] || "Preforker"
  default_log_file = "#{@app_name.downcase}.log"
  @options = {
    :timeout => 5,
    :workers => 10,
    :app_name => "Preforker",
    :stderr_path => default_log_file,
    :stderr_path => default_log_file
  }.merge(options)

  @logger = @options[:logger] || Logger.new(default_log_file)

  @timeout = @options[:timeout]
  @number_of_workers = @options[:workers]
  @worker_block = worker_block || lambda {}

  @workers = {}
  $0 = "#@app_name Master"
end

Instance Attribute Details

#app_nameObject (readonly)

Returns the value of attribute app_name.



9
10
11
# File 'lib/preforker.rb', line 9

def app_name
  @app_name
end

#loggerObject (readonly)

Returns the value of attribute logger.



9
10
11
# File 'lib/preforker.rb', line 9

def logger
  @logger
end

#number_of_workersObject

Returns the value of attribute number_of_workers.



10
11
12
# File 'lib/preforker.rb', line 10

def number_of_workers
  @number_of_workers
end

#timeoutObject (readonly)

Returns the value of attribute timeout.



9
10
11
# File 'lib/preforker.rb', line 9

def timeout
  @timeout
end

Instance Method Details

#close_resources_worker_wont_useObject



89
90
91
92
93
# File 'lib/preforker.rb', line 89

def close_resources_worker_wont_use
  @signal_processor.reset
  @workers.values.each { |other| other.tmp.close rescue nil }
  @workers.clear
end

#launch(&block) ⇒ Object



63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
# File 'lib/preforker.rb', line 63

def launch(&block)
   puts "Starting server"

   ready_read, ready_write = IO.pipe
   [ready_read, ready_write].each { |io| io.fcntl(Fcntl::F_SETFD, Fcntl::FD_CLOEXEC) }

   fork do
     ready_read.close

     Process.setsid
     fork do
       block.call(ready_write)
     end
   end

   ready_write.close
   master_pid = (ready_read.readpartial(16) rescue nil).to_i
   ready_read.close
   if master_pid <= 1
     warn "Master failed to start, check stderr log for details"
     exit!(1)
   else
     puts "Server started successfuly"
   end
end

#maintain_worker_countObject



137
138
139
140
141
142
143
144
# File 'lib/preforker.rb', line 137

def maintain_worker_count
  number_of_missing_workers = @number_of_workers - @workers.size
  return if number_of_missing_workers == 0
  return spawn_missing_workers(number_of_missing_workers) if number_of_missing_workers > 0
  @workers.values[0..(-number_of_missing_workers - 1)].each do |unneeded_worker|
    signal_worker(:QUIT, unneeded_worker.pid) rescue nil
  end
end

#murder_lazy_workersObject

forcibly terminate all workers that haven’t checked in in timeout seconds. The timeout is implemented using an unlinked File shared between the parent process and each worker. The worker runs File#chmod to modify the ctime of the File. If the ctime is stale for >timeout seconds, then we’ll kill the corresponding worker.



152
153
154
155
156
157
158
159
160
161
# File 'lib/preforker.rb', line 152

def murder_lazy_workers
  @workers.dup.each_pair do |worker_pid, worker|
    stat = worker.tmp.stat
    # skip workers that disable fchmod or have never fchmod-ed
    next if stat.mode == 0100600
    next if (diff = (Time.now - stat.ctime)) <= @timeout
    logger.error "Worker=#{worker_pid} timeout (#{diff}s > #{@timeout}s), killing"
    signal_worker(:KILL, worker_pid) # take no prisoners for timeout violations
  end
end

#quit(graceful = true) ⇒ Object



106
107
108
109
# File 'lib/preforker.rb', line 106

def quit(graceful = true)
  stop(graceful)
  @pid_manager.unlink
end

#reap_all_workersObject



111
112
113
114
115
116
117
118
119
120
121
# File 'lib/preforker.rb', line 111

def reap_all_workers
  begin
    loop do
      worker_pid, status = Process.waitpid2(-1, Process::WNOHANG)
      break unless worker_pid
      worker = @workers.delete(worker_pid) and worker.tmp.close rescue nil
      logger.info "reaped #{status.inspect}"
    end
  rescue Errno::ECHILD
  end
end

#redirect_io(io, path) ⇒ Object



190
191
192
193
# File 'lib/preforker.rb', line 190

def redirect_io(io, path)
  File.open(path, 'ab') { |fp| io.reopen(fp) } if path
  io.sync = true
end

#run(ready_write = nil) ⇒ Object



33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
# File 'lib/preforker.rb', line 33

def run(ready_write = nil)
  $stdin.reopen("/dev/null")
  set_stdout_path(@options[:stdout_path])
  set_stderr_path(@options[:stderr_path])

  logger.info "Master started"

  pid_path = @options[:pid_path] || "./#{@app_name.downcase}.pid"
  @pid_manager = PidManager.new(pid_path)
  @signal_processor = SignalProcessor.new(self)

  spawn_missing_workers do
    ready_write.close if ready_write
  end

  #tell parent we are ready
  if ready_write
    ready_write.syswrite($$.to_s)
    ready_write.close rescue nil
  end

  @signal_processor.start_signal_loop
end

#set_stderr_path(path) ⇒ Object



186
187
188
# File 'lib/preforker.rb', line 186

def set_stderr_path(path)
  redirect_io($stderr, path)
end

#set_stdout_path(path) ⇒ Object



182
183
184
# File 'lib/preforker.rb', line 182

def set_stdout_path(path)
  redirect_io($stdout, path)
end

#signal_each_worker(signal) ⇒ Object

delivers a signal to each worker



174
175
176
# File 'lib/preforker.rb', line 174

def signal_each_worker(signal)
  @workers.keys.each { |worker_pid| signal_worker(signal, worker_pid) }
end

#signal_quitObject



178
179
180
# File 'lib/preforker.rb', line 178

def signal_quit
  signal_worker(:QUIT, @pid_manager.pid)
end

#signal_worker(signal, worker_pid) ⇒ Object

delivers a signal to a worker and fails gracefully if the worker is no longer running.



165
166
167
168
169
170
171
# File 'lib/preforker.rb', line 165

def signal_worker(signal, worker_pid)
  begin
    Process.kill(signal, worker_pid)
  rescue Errno::ESRCH
    worker = @workers.delete(worker_pid) and worker.tmp.close rescue nil
  end
end

#spawn_missing_workers(new_workers_count = @number_of_workers, &init_block) ⇒ Object



123
124
125
126
127
128
129
130
131
132
133
134
135
# File 'lib/preforker.rb', line 123

def spawn_missing_workers(new_workers_count = @number_of_workers, &init_block)
  new_workers_count.times do
    worker = Worker.new(@worker_block, self)
    worker_pid = fork do
      close_resources_worker_wont_use
      init_block.call if init_block
      worker.work
    end

    worker.pid = worker_pid
    @workers[worker_pid] = worker
  end
end

#startObject



57
58
59
60
61
# File 'lib/preforker.rb', line 57

def start
  launch do |ready_write|
    run(ready_write)
  end
end

#stop(graceful = true) ⇒ Object

Terminates all workers, but does not exit master process



96
97
98
99
100
101
102
103
104
# File 'lib/preforker.rb', line 96

def stop(graceful = true)
  limit = Time.now + @timeout
  until @workers.empty? || Time.now > limit
    signal_each_worker(graceful ? :QUIT : :TERM)
    sleep(0.1)
    reap_all_workers
  end
  signal_each_worker(:KILL)
end