Class: Preforker

Inherits:
Object
  • Object
show all
Defined in:
lib/preforker.rb,
lib/preforker/util.rb,
lib/preforker/worker.rb,
lib/preforker/pid_manager.rb,
lib/preforker/signal_processor.rb

Defined Under Namespace

Classes: PidManager, SignalProcessor, TmpIO, Util, Worker

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(options = {}, &worker_block) ⇒ Preforker

Returns a new instance of Preforker.



12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
# File 'lib/preforker.rb', line 12

def initialize(options = {}, &worker_block)
  @app_name = options[:app_name] || "Preforker"
  default_log_file = "#{@app_name.downcase}.log"
  @options = {
    :timeout => 5,
    :workers => 10,
    :app_name => "Preforker",
    :stderr_path => default_log_file,
    :stderr_path => default_log_file
  }.merge(options)

  @logger = @options[:logger] || Logger.new(default_log_file)

  @timeout = @options[:timeout]
  @number_of_workers = @options[:workers]
  @worker_block = worker_block || lambda {}

  @workers = {}
  $0 = "#@app_name Master"
end

Instance Attribute Details

#app_nameObject (readonly)

Returns the value of attribute app_name.



9
10
11
# File 'lib/preforker.rb', line 9

def app_name
  @app_name
end

#loggerObject (readonly)

Returns the value of attribute logger.



9
10
11
# File 'lib/preforker.rb', line 9

def logger
  @logger
end

#number_of_workersObject

Returns the value of attribute number_of_workers.



10
11
12
# File 'lib/preforker.rb', line 10

def number_of_workers
  @number_of_workers
end

#timeoutObject (readonly)

Returns the value of attribute timeout.



9
10
11
# File 'lib/preforker.rb', line 9

def timeout
  @timeout
end

Instance Method Details

#close_resources_worker_wont_useObject



84
85
86
87
88
# File 'lib/preforker.rb', line 84

def close_resources_worker_wont_use
  @signal_processor.reset
  @workers.values.each { |other| other.tmp.close rescue nil }
  @workers.clear
end

#launch(&block) ⇒ Object



57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
# File 'lib/preforker.rb', line 57

def launch(&block)
   puts "Starting server"

   ready_read, ready_write = IO.pipe
   [ready_read, ready_write].each { |io| io.fcntl(Fcntl::F_SETFD, Fcntl::FD_CLOEXEC) }

   fork do
     ready_read.close

     Process.setsid
     fork do
       block.call(ready_write)
     end
   end

   ready_write.close
   master_pid = (ready_read.readpartial(16) rescue nil).to_i
   ready_read.close
   if master_pid <= 1
     warn "Master failed to start, check stderr log for details"
     exit!(1)
   else
     puts "Server started successfuly"
     exit(0)
   end
end

#maintain_worker_countObject



132
133
134
135
136
137
138
139
# File 'lib/preforker.rb', line 132

def maintain_worker_count
  number_of_missing_workers = @number_of_workers - @workers.size
  return if number_of_missing_workers == 0
  return spawn_missing_workers(number_of_missing_workers) if number_of_missing_workers > 0
  @workers.values[0..(-number_of_missing_workers - 1)].each do |unneeded_worker|
    signal_worker(:QUIT, unneeded_worker.pid) rescue nil
  end
end

#murder_lazy_workersObject

forcibly terminate all workers that haven’t checked in in timeout seconds. The timeout is implemented using an unlinked File shared between the parent process and each worker. The worker runs File#chmod to modify the ctime of the File. If the ctime is stale for >timeout seconds, then we’ll kill the corresponding worker.



147
148
149
150
151
152
153
154
155
156
# File 'lib/preforker.rb', line 147

def murder_lazy_workers
  @workers.dup.each_pair do |worker_pid, worker|
    stat = worker.tmp.stat
    # skip workers that disable fchmod or have never fchmod-ed
    next if stat.mode == 0100600
    next if (diff = (Time.now - stat.ctime)) <= @timeout
    logger.error "Worker=#{worker_pid} timeout (#{diff}s > #{@timeout}s), killing"
    signal_worker(:KILL, worker_pid) # take no prisoners for timeout violations
  end
end

#quit(graceful = true) ⇒ Object



101
102
103
104
# File 'lib/preforker.rb', line 101

def quit(graceful = true)
  stop(graceful)
  @pid_manager.unlink
end

#reap_all_workersObject



106
107
108
109
110
111
112
113
114
115
116
# File 'lib/preforker.rb', line 106

def reap_all_workers
  begin
    loop do
      worker_pid, status = Process.waitpid2(-1, Process::WNOHANG)
      break unless worker_pid
      worker = @workers.delete(worker_pid) and worker.tmp.close rescue nil
      logger.info "reaped #{status.inspect}"
    end
  rescue Errno::ECHILD
  end
end

#redirect_io(io, path) ⇒ Object



185
186
187
188
# File 'lib/preforker.rb', line 185

def redirect_io(io, path)
  File.open(path, 'ab') { |fp| io.reopen(fp) } if path
  io.sync = true
end

#set_stderr_path(path) ⇒ Object



181
182
183
# File 'lib/preforker.rb', line 181

def set_stderr_path(path)
  redirect_io($stderr, path)
end

#set_stdout_path(path) ⇒ Object



177
178
179
# File 'lib/preforker.rb', line 177

def set_stdout_path(path)
  redirect_io($stdout, path)
end

#signal_each_worker(signal) ⇒ Object

delivers a signal to each worker



169
170
171
# File 'lib/preforker.rb', line 169

def signal_each_worker(signal)
  @workers.keys.each { |worker_pid| signal_worker(signal, worker_pid) }
end

#signal_quitObject



173
174
175
# File 'lib/preforker.rb', line 173

def signal_quit
  signal_worker(:QUIT, @pid_manager.pid)
end

#signal_worker(signal, worker_pid) ⇒ Object

delivers a signal to a worker and fails gracefully if the worker is no longer running.



160
161
162
163
164
165
166
# File 'lib/preforker.rb', line 160

def signal_worker(signal, worker_pid)
  begin
    Process.kill(signal, worker_pid)
  rescue Errno::ESRCH
    worker = @workers.delete(worker_pid) and worker.tmp.close rescue nil
  end
end

#spawn_missing_workers(new_workers_count = @number_of_workers, &init_block) ⇒ Object



118
119
120
121
122
123
124
125
126
127
128
129
130
# File 'lib/preforker.rb', line 118

def spawn_missing_workers(new_workers_count = @number_of_workers, &init_block)
  new_workers_count.times do
    worker = Worker.new(@worker_block, self)
    worker_pid = fork do
      close_resources_worker_wont_use
      init_block.call if init_block
      worker.work
    end

    worker.pid = worker_pid
    @workers[worker_pid] = worker
  end
end

#startObject



33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
# File 'lib/preforker.rb', line 33

def start
  launch do |ready_write|
    $stdin.reopen("/dev/null")
    set_stdout_path(@options[:stdout_path])
    set_stderr_path(@options[:stderr_path])

    logger.info "Master started"

    pid_path = @options[:pid_path] || "./#{@app_name.downcase}.pid"
    @pid_manager = PidManager.new(pid_path)
    @signal_processor = SignalProcessor.new(self)

    spawn_missing_workers do
      ready_write.close
    end

    #tell parent we are ready
    ready_write.syswrite($$.to_s)
    ready_write.close rescue nil

    @signal_processor.start_signal_loop
  end
end

#stop(graceful = true) ⇒ Object

Terminates all workers, but does not exit master process



91
92
93
94
95
96
97
98
99
# File 'lib/preforker.rb', line 91

def stop(graceful = true)
  limit = Time.now + @timeout
  until @workers.empty? || Time.now > limit
    signal_each_worker(graceful ? :QUIT : :TERM)
    sleep(0.1)
    reap_all_workers
  end
  signal_each_worker(:KILL)
end