Module: JobManager::System
- Defined in:
- lib/jobmanager/system.rb
Overview
This module contains the system programming required to invoke the job process and record its output and exit status.
Constant Summary collapse
- SLEEP_DURATION_PIPE_OPEN_BUT_EMPTY =
0.5- SLEEP_DURATION_PIPE_CLOSED =
0.5- SLEEP_DURATION_WAIT_AFTER_KILL =
10.0- BUFFER_SIZE =
4096
Class Method Summary collapse
-
.invoke_command(command, log_stream, logger, optional_args = {}, &block) ⇒ Object
Description: This method forks and executes the command parameter.
-
.which(name, path = ) ⇒ Object
Description: This method searches the path passed in for an executable with the name passed in.
Class Method Details
.invoke_command(command, log_stream, logger, optional_args = {}, &block) ⇒ Object
Description:
This method forks and executes the command parameter. It writes the output (stdout and stderr) of the forked process to the parameter log_stream. This method logs operational information (when the command is started, finished, the result) via the logger command.
The optional arguments include:
- timeout
-
The timeout (in seconds) after which this process will timeout and kill the forked process. If this option is not specified, no timeout will be applied- this process will wait indefinitely for the forked process to complete.
- command_path
-
The path this method will search through to find the passed in command if the command has a relative path. If this option is not specified, the environment path will be used.
Parameters:
- command
-
The command to be invoked.
- log_stream
-
The stream to which this method will write the forked process’s output (stdout and stderr).
- logger
-
The logger instance to which this method will write operational information.
- optional_args
-
A hash table of optional arguments.
Returns:
true if the command was successful, false otherwise.
53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 |
# File 'lib/jobmanager/system.rb', line 53 def self.invoke_command(command, log_stream, logger, optional_args = {}, &block) # Create a pipe which the forked process will write all of its # output to (stdout and stderr), and this process will read said # output from. read_pipe, write_pipe = IO.pipe original_time = Time.now() success = false job_log_write_success = true timeout = optional_args[:timeout] env_path = optional_args[:command_path] ? optional_args[:command_path] : ENV["PATH"] # Parse out the executable from the command (eg. command = "ls -l /", exe = "ls") exe = command.split(' ')[0] # Search the path for the executable (if the executable has a # relative path), and ensure the file referenced is indeed # executable. if (!( = which(exe, env_path))) if (Pathname.new(exe).absolute?()) logger.error("File #{exe} does not exist or is not executable!") else logger.error("File #{exe} does not exist in path (#{env_path}) or is not executable!") end return false end exe = # Rebuild the command with the full path of the executable. command_and_args = command.split(' ') command_and_args[0] = exe command = command_and_args.join(' ') # Fork a child process which will in turn exec the command. if (child_pid = fork()) # The parent process. # This is primarily for testing purposes. if (block) then block.call(child_pid) end # The parent process will only read from this pipe. write_pipe.close() logger.info("Command (#{command}) launched, pid = #{child_pid}.") timed_out = false # while pipe is open and the timeout hasn't been reached while (!timeout || !(timed_out = Time.now() - original_time > timeout)) bytes = "" begin bytes = read_pipe.read_nonblock(BUFFER_SIZE) rescue Errno::EAGAIN # EAGAIN will be raised if there is nothing to read on the # pipe. sleep(SLEEP_DURATION_PIPE_OPEN_BUT_EMPTY) rescue EOFError # EOFError will be raised if the writer has closed the # pipe. break rescue => error logger.error("Unexpected Error (#{error.class}) " + "reading from the pipe associated with child process:" + "#{error.}") raise end if (bytes.length() > 0) begin log_stream << bytes rescue => e logger.error("Error writing to job log: #{e.}") job_log_write_success = false end end end # while the child is alive and the timeout hasn't been reached while (!timeout || !(timed_out = Time.now() - original_time > timeout)) # if the child process has exited, check the exit status if (Process.waitpid(child_pid, Process::WNOHANG)) # success? will return nil if the child didn't exit normally child_success = $CHILD_STATUS.success?() ? true : false; pid = $CHILD_STATUS.pid() if (child_success) logger.info("Job exited successfully") else if ($CHILD_STATUS.exited?()) logger.error("Failed! - Job exited normally with exit code: #{$CHILD_STATUS.exitstatus()}.") elsif ($CHILD_STATUS.signaled?()) logger.error("Failed! - Job exited abnormally due to signal: #{$CHILD_STATUS.termsig()}.") else logger.error("Failed! - Job exited abnormally: #{$CHILD_STATUS.inspect()}") end end success = child_success && job_log_write_success break else sleep(SLEEP_DURATION_PIPE_CLOSED) end end # if the child process has reached its timeout, kill it. if (timed_out) logger.error("Timed out after #{timeout} seconds.") # Kill the child process with the TERM signal first. The # child process can catch this signal and exit gracefully. Process.kill("TERM", child_pid) logger.error("Sent signal TERM to child process.") sleep(SLEEP_DURATION_WAIT_AFTER_KILL) # Check if the child pid has exited. if (Process.waitpid(child_pid, Process::WNOHANG)) logger.error("Reaped pid from child process.") else # The child pid has not exited. Send a KILL signal. # A process can not catch this signal. Process.kill("KILL", child_pid) logger.error("Sent signal KILL to child process.") sleep(SLEEP_DURATION_WAIT_AFTER_KILL) # Check if the child pid has exited. if (Process.waitpid(child_pid, Process::WNOHANG)) logger.error("Reaped pid from child process.") else # The child should've exited already. Either the system # is VERY slow and the OS hasn't gotten around to # tearing down the process, or something very strange is # going on. logger.error("Failed to reap pid from child process.") end end end return success else # The child process. # The child process will only write to this pipe. read_pipe.close() # Redirect stdout and stderr to the pipe (to be read by the # parent process). Reset $stdout and $stderr, as they may # have earlier been set to something other than STDOUT,STDERR. STDOUT.reopen(write_pipe); $stdout = STDOUT STDERR.reopen(write_pipe); $stderr = STDERR # Exec the command! begin exec(command) rescue => e print "Exec failed for command (#{command}), error: #{e}!"; end # We will only get here if an error was raised. exit(1) end end |
.which(name, path = ) ⇒ Object
Description:
This method searches the path passed in for an executable with the name passed in. It searches in order and returns the absolute pathname of the first match.
Parameters:
- name
-
The name of the executable to search for.
- path
-
The path to search.
Returns:
The absolute pathname of the first match.
238 239 240 241 242 243 244 245 |
# File 'lib/jobmanager/system.rb', line 238 def self.which(name, path = ENV["PATH"]) if (Pathname.new(name).absolute?()) return (File.file?(name) && File.executable?(name)) ? name : nil else candidates = path.split(':').map { |path| File.join(path, name)} return candidates.find { |file| File.file?(file) && File.executable?(file) } end end |