Class: Magistrate::Worker
- Inherits:
-
Object
- Object
- Magistrate::Worker
- Defined in:
- lib/magistrate/worker.rb
Instance Attribute Summary collapse
-
#bounces ⇒ Object
readonly
Returns the value of attribute bounces.
-
#daemonize ⇒ Object
readonly
Returns the value of attribute daemonize.
-
#env ⇒ Object
readonly
Returns the value of attribute env.
-
#logs ⇒ Object
readonly
Returns the value of attribute logs.
-
#monitored ⇒ Object
Returns the value of attribute monitored.
-
#name ⇒ Object
readonly
Returns the value of attribute name.
-
#pid_file ⇒ Object
readonly
Returns the value of attribute pid_file.
-
#reset_target_state_to ⇒ Object
readonly
Returns the value of attribute reset_target_state_to.
-
#start_cmd ⇒ Object
readonly
Returns the value of attribute start_cmd.
-
#stop_cmd ⇒ Object
readonly
Returns the value of attribute stop_cmd.
-
#target_state ⇒ Object
Returns the value of attribute target_state.
-
#working_dir ⇒ Object
readonly
Returns the value of attribute working_dir.
Instance Method Summary collapse
- #alive? ⇒ Boolean
- #double_fork(command) ⇒ Object
-
#ensure_stop ⇒ Object
Ensure that a stop command actually stops the process.
-
#initialize(name, options = {}) ⇒ Worker
constructor
A new instance of Worker.
-
#load_previous_status ⇒ Object
Loads the number of times in a row this worker has been started without successfully staying running This is stored in the @status_file.
- #log(str) ⇒ Object
-
#pid ⇒ Object
Fetch the PID from pid_file.
- #running? ⇒ Boolean
- #save_status ⇒ Object
-
#signal(sig, target_pid = nil) ⇒ Object
Send the given signal to this process.
-
#single_fork(command) ⇒ Object
single fork self-daemonizing processes we want to wait for them to finish.
-
#spawn(command) ⇒ Object
Fork/exec the given command, returns immediately
command
is the String containing the shell command. - #start ⇒ Object
- #state ⇒ Object
- #status ⇒ Object
- #stop ⇒ Object
-
#supervise! ⇒ Object
This is to be called when we first start managing a worker It will check if the pid exists and if so, is the process responding OK? It will take action based on the target state.
- #write_pid ⇒ Object
Constructor Details
#initialize(name, options = {}) ⇒ Worker
Returns a new instance of Worker.
9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 |
# File 'lib/magistrate/worker.rb', line 9 def initialize(name, = {}) @name = name @daemonize = [:daemonize] @working_dir = [:working_dir] @start_cmd = [:start_cmd] @pid_path = [:pid_path] if @daemonize @pid_file = File.join(@pid_path, "#{@name}.pid") @status_file = File.join(@pid_path, "#{@name}.status") @stop_signal = [:stop_signal] || 'TERM' @previous_status = load_previous_status @bounces = @previous_status[:bounces] || 0 else @stop_cmd = [:end_cmd] @pid_file = [:pid_file] end @stop_timeout = 5 @start_timeout = 5 @env = {} @target_state = :unknown @logs = [] end |
Instance Attribute Details
#bounces ⇒ Object (readonly)
Returns the value of attribute bounces.
6 7 8 |
# File 'lib/magistrate/worker.rb', line 6 def bounces @bounces end |
#daemonize ⇒ Object (readonly)
Returns the value of attribute daemonize.
6 7 8 |
# File 'lib/magistrate/worker.rb', line 6 def daemonize @daemonize end |
#env ⇒ Object (readonly)
Returns the value of attribute env.
6 7 8 |
# File 'lib/magistrate/worker.rb', line 6 def env @env end |
#logs ⇒ Object (readonly)
Returns the value of attribute logs.
6 7 8 |
# File 'lib/magistrate/worker.rb', line 6 def logs @logs end |
#monitored ⇒ Object
Returns the value of attribute monitored.
7 8 9 |
# File 'lib/magistrate/worker.rb', line 7 def monitored @monitored end |
#name ⇒ Object (readonly)
Returns the value of attribute name.
6 7 8 |
# File 'lib/magistrate/worker.rb', line 6 def name @name end |
#pid_file ⇒ Object (readonly)
Returns the value of attribute pid_file.
6 7 8 |
# File 'lib/magistrate/worker.rb', line 6 def pid_file @pid_file end |
#reset_target_state_to ⇒ Object (readonly)
Returns the value of attribute reset_target_state_to.
6 7 8 |
# File 'lib/magistrate/worker.rb', line 6 def reset_target_state_to @reset_target_state_to end |
#start_cmd ⇒ Object (readonly)
Returns the value of attribute start_cmd.
6 7 8 |
# File 'lib/magistrate/worker.rb', line 6 def start_cmd @start_cmd end |
#stop_cmd ⇒ Object (readonly)
Returns the value of attribute stop_cmd.
6 7 8 |
# File 'lib/magistrate/worker.rb', line 6 def stop_cmd @stop_cmd end |
#target_state ⇒ Object
Returns the value of attribute target_state.
7 8 9 |
# File 'lib/magistrate/worker.rb', line 7 def target_state @target_state end |
#working_dir ⇒ Object (readonly)
Returns the value of attribute working_dir.
6 7 8 |
# File 'lib/magistrate/worker.rb', line 6 def working_dir @working_dir end |
Instance Method Details
#alive? ⇒ Boolean
286 287 288 289 290 291 292 |
# File 'lib/magistrate/worker.rb', line 286 def alive? if p = self.pid !!::Process.kill(0, p) rescue false else false end end |
#double_fork(command) ⇒ Object
154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 |
# File 'lib/magistrate/worker.rb', line 154 def double_fork(command) pid = nil # double fork daemonized processes # we don't want to wait for them to finish r, w = IO.pipe begin opid = fork do STDOUT.reopen(w) r.close pid = self.spawn(command) puts pid.to_s # send pid back to forker end ::Process.waitpid(opid, 0) w.close pid = r.gets.chomp ensure # make sure the file descriptors get closed no matter what r.close rescue nil w.close rescue nil end pid end |
#ensure_stop ⇒ Object
Ensure that a stop command actually stops the process. Force kill if necessary.
Returns nothing
228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 |
# File 'lib/magistrate/worker.rb', line 228 def ensure_stop log "Ensuring stop..." unless self.pid log "Stop called but pid is unknown" return end # Poll to see if it's dead @stop_timeout.times do begin signal(0) rescue Errno::ESRCH # It died. Good. return end sleep 1 end # last resort signal('KILL') log "Still alive after #{@stop_timeout}s; sent SIGKILL" end |
#load_previous_status ⇒ Object
Loads the number of times in a row this worker has been started without successfully staying running This is stored in the @status_file
42 43 44 45 46 |
# File 'lib/magistrate/worker.rb', line 42 def load_previous_status File.open(@status_file) { |file| YAML.load(file) } || {} rescue {} end |
#log(str) ⇒ Object
36 37 38 |
# File 'lib/magistrate/worker.rb', line 36 def log(str) @logs << str end |
#pid ⇒ Object
Fetch the PID from pid_file. If the pid_file does not exist, then use the PID from the last time it was read. If it has never been read, then return nil.
Returns Integer(pid) or nil
268 269 270 271 272 273 274 275 276 277 278 |
# File 'lib/magistrate/worker.rb', line 268 def pid contents = File.read(@pid_file).strip rescue '' real_pid = contents =~ /^\d+$/ ? contents.to_i : nil if real_pid @pid = real_pid real_pid else @pid end end |
#running? ⇒ Boolean
64 65 |
# File 'lib/magistrate/worker.rb', line 64 def running? end |
#save_status ⇒ Object
48 49 50 51 52 |
# File 'lib/magistrate/worker.rb', line 48 def save_status if @status_file File.open(@status_file, "w") { |file| YAML.dump(status, file) } rescue nil end end |
#signal(sig, target_pid = nil) ⇒ Object
Send the given signal to this process.
Returns nothing
256 257 258 259 260 261 |
# File 'lib/magistrate/worker.rb', line 256 def signal(sig, target_pid = nil) target_pid ||= self.pid sig = sig.to_i if sig.to_i != 0 log "Sending signal '#{sig}' to pid #{target_pid}" ::Process.kill(sig, target_pid) rescue nil end |
#single_fork(command) ⇒ Object
single fork self-daemonizing processes we want to wait for them to finish
143 144 145 146 147 148 149 150 151 152 |
# File 'lib/magistrate/worker.rb', line 143 def single_fork(command) pid = self.spawn(command) status = ::Process.waitpid2(pid, 0) exit_code = status[1] >> 8 if exit_code != 0 log "Command exited with non-zero code = #{exit_code}" end pid end |
#spawn(command) ⇒ Object
Fork/exec the given command, returns immediately
+command+ is the String containing the shell command
Returns nothing
183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 |
# File 'lib/magistrate/worker.rb', line 183 def spawn(command) fork do ::Process.setsid dir = @working_dir || '/' Dir.chdir dir #$0 = command $0 = "Magistrate Worker: #{@name}" STDIN.reopen "/dev/null" STDOUT.reopen '/dev/null' STDERR.reopen STDOUT # if self.log_cmd # STDOUT.reopen IO.popen(self.log_cmd, "a") # else # STDOUT.reopen file_in_chroot(self.log), "a" # end # # if err_log_cmd # STDERR.reopen IO.popen(err_log_cmd, "a") # elsif err_log && (log_cmd || err_log != log) # STDERR.reopen file_in_chroot(err_log), "a" # else # STDERR.reopen STDOUT # end # close any other file descriptors 3.upto(256){|fd| IO::new(fd).close rescue nil} if @env && @env.is_a?(Hash) @env.each do |key, value| ENV[key] = value.to_s end end exec command unless command.empty? end end |
#start ⇒ Object
103 104 105 106 107 108 109 110 111 112 113 114 |
# File 'lib/magistrate/worker.rb', line 103 def start if @daemonize log "Starting as daemon with double_fork" @pid = double_fork(@start_cmd) # TODO: Should check if the pid really exists as we expect write_pid else log "Starting as self-daemonizing with single_fork" @pid = single_fork(@start_cmd) end @pid end |
#state ⇒ Object
67 68 69 70 71 72 73 74 75 76 77 |
# File 'lib/magistrate/worker.rb', line 67 def state if @target_state == :unmonitored || @target_state == :unknown :unmonitored else if self.alive? :running else :stopped end end end |
#status ⇒ Object
54 55 56 57 58 59 60 61 62 |
# File 'lib/magistrate/worker.rb', line 54 def status { :state => self.state, :target_state => self.target_state, :pid => self.pid, :bounces => @bounces, :logs => @logs } end |
#stop ⇒ Object
116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 |
# File 'lib/magistrate/worker.rb', line 116 def stop if @daemonize signal(@stop_signal, pid) # Poll to see if it's dead @stop_timeout.times do begin ::Process.kill(0, pid) rescue Errno::ESRCH # It died. Good. log "Process stopped" return end sleep 1 end signal('KILL', pid) log "Still alive after #{@stop_timeout}s; sent SIGKILL" else single_fork(@stop_cmd) ensure_stop end end |
#supervise! ⇒ Object
This is to be called when we first start managing a worker It will check if the pid exists and if so, is the process responding OK? It will take action based on the target state
82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 |
# File 'lib/magistrate/worker.rb', line 82 def supervise! log "Supervising. Is: #{state}. Target: #{@target_state}" if state != @target_state case @target_state when :forced_restart then @reset_target_state_to = :running log "Restart: Stopping, then Starting, then reporting new target_state of :running" stop start when :running then start @bounces += 1 when :stopped then stop end else @bounces = 0 end save_status end |
#write_pid ⇒ Object
280 281 282 283 284 |
# File 'lib/magistrate/worker.rb', line 280 def write_pid File.open(@pid_file, 'w') do |f| f.write @pid end end |