Module: JobManager::System

Defined in:
lib/jobmanager/system.rb

Overview

This module contains the system programming required to invoke the job process and record its output and exit status.

Constant Summary collapse

SLEEP_DURATION_PIPE_OPEN_BUT_EMPTY =
0.5
SLEEP_DURATION_PIPE_CLOSED =
0.5
SLEEP_DURATION_WAIT_AFTER_KILL =
10.0
BUFFER_SIZE =
4096

Class Method Summary collapse

Class Method Details

.invoke_command(command, log_stream, logger, optional_args = {}, &block) ⇒ Object

Description:

This method forks and executes the command parameter. It writes the output (stdout and stderr) of the forked process to the parameter log_stream. This method logs operational information (when the command is started, finished, the result) via the logger command.

The optional arguments include:

timeout

The timeout (in seconds) after which this process will timeout and kill the forked process. If this option is not specified, no timeout will be applied- this process will wait indefinitely for the forked process to complete.

command_path

The path this method will search through to find the passed in command if the command has a relative path. If this option is not specified, the environment path will be used.

Parameters:

command

The command to be invoked.

log_stream

The stream to which this method will write the forked process’s output (stdout and stderr).

logger

The logger instance to which this method will write operational information.

optional_args

A hash table of optional arguments.

Returns:

true if the command was successful, false otherwise.



53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
# File 'lib/jobmanager/system.rb', line 53

def self.invoke_command(command,
                        log_stream, 
                        logger,
                        optional_args = {},
                        &block)

  # Create a pipe which the forked process will write all of its
  # output to (stdout and stderr), and this process will read said
  # output from.
  read_pipe, write_pipe = IO.pipe
  original_time = Time.now()
  success = false
  job_log_write_success = true
  timeout = optional_args[:timeout]
  env_path = optional_args[:command_path] ? optional_args[:command_path] : ENV["PATH"]

  # Parse out the executable from the command (eg. command = "ls -l /", exe = "ls")
  exe = command.split(' ')[0]
  
  # Search the path for the executable (if the executable has a
  # relative path), and ensure the file referenced is indeed
  # executable.
  if (!(exe_expanded_path = which(exe, env_path)))
    if (Pathname.new(exe).absolute?())
      logger.error("File #{exe} does not exist or is not executable!")
    else
      logger.error("File #{exe} does not exist in path (#{env_path}) or is not executable!")
    end
    return false
  end
  
  exe = exe_expanded_path
  
  # Rebuild the command with the full path of the executable.
  command_and_args = command.split(' ')
  command_and_args[0] = exe
  command = command_and_args.join(' ')
  
  # Fork a child process which will in turn exec the command.
  if (child_pid = fork())
    # The parent process.
    
    # This is primarily for testing purposes.
    if (block) then block.call(child_pid) end
    
    # The parent process will only read from this pipe.
    write_pipe.close()
    
    logger.info("Command (#{command}) launched, pid = #{child_pid}.")
    
    timed_out = false
    
    # while pipe is open and the timeout hasn't been reached
    while (!timeout || !(timed_out = Time.now() - original_time > timeout))
      bytes = ""
      begin 
        bytes = read_pipe.read_nonblock(BUFFER_SIZE)
      rescue Errno::EAGAIN
        # EAGAIN will be raised if there is nothing to read on the
        # pipe.
        sleep(SLEEP_DURATION_PIPE_OPEN_BUT_EMPTY)
      rescue EOFError
        # EOFError will be raised if the writer has closed the
        # pipe.
        break
      rescue => error
        logger.error("Unexpected Error (#{error.class}) " +
                     "reading from the pipe associated with child process:" +
                     "#{error.message}")
        raise
      end
      
      if (bytes.length() > 0)
        begin
          log_stream << bytes
        rescue => e
          logger.error("Error writing to job log: #{e.message}")
          job_log_write_success = false
        end
      end
    end
    
    # while the child is alive and the timeout hasn't been reached
    while (!timeout || !(timed_out = Time.now() - original_time > timeout))
      
      # if the child process has exited, check the exit status
      if (Process.waitpid(child_pid, Process::WNOHANG))
        
        # success? will return nil if the child didn't exit normally
        child_success = $CHILD_STATUS.success?() ? true : false;
        pid = $CHILD_STATUS.pid()
        
        if (child_success)
          logger.info("Job exited successfully")
        else
          if ($CHILD_STATUS.exited?())
            logger.error("Failed! - Job exited normally with exit code: #{$CHILD_STATUS.exitstatus()}.")
          elsif ($CHILD_STATUS.signaled?())
            logger.error("Failed! - Job exited abnormally due to signal: #{$CHILD_STATUS.termsig()}.")
          else
            logger.error("Failed! - Job exited abnormally: #{$CHILD_STATUS.inspect()}")
          end
        end
        success = child_success && job_log_write_success
        
        break
      else
        sleep(SLEEP_DURATION_PIPE_CLOSED)
      end
    end
    
    # if the child process has reached its timeout, kill it.
    if (timed_out)
      logger.error("Timed out after #{timeout} seconds.")
      
      # Kill the child process with the TERM signal first.  The
      # child process can catch this signal and exit gracefully.
      Process.kill("TERM", child_pid)
      logger.error("Sent signal TERM to child process.")
      
      sleep(SLEEP_DURATION_WAIT_AFTER_KILL)
      
      # Check if the child pid has exited.
      if (Process.waitpid(child_pid, Process::WNOHANG))
        logger.error("Reaped pid from child process.")
      else
        # The child pid has not exited.  Send a KILL signal.
        # A process can not catch this signal.
        Process.kill("KILL", child_pid)
        logger.error("Sent signal KILL to child process.")
        
        sleep(SLEEP_DURATION_WAIT_AFTER_KILL)
        
        # Check if the child pid has exited.
        if (Process.waitpid(child_pid, Process::WNOHANG))
          logger.error("Reaped pid from child process.")
        else
          # The child should've exited already.  Either the system
          # is VERY slow and the OS hasn't gotten around to
          # tearing down the process, or something very strange is
          # going on.
          logger.error("Failed to reap pid from child process.")
        end
      end
    end
    
    return success
  else
    # The child process.
    
    # The child process will only write to this pipe.
    read_pipe.close()
    
    # Redirect stdout and stderr to the pipe (to be read by the
    # parent process).  Reset $stdout and $stderr, as they may
    # have earlier been set to something other than STDOUT,STDERR.
    STDOUT.reopen(write_pipe); $stdout = STDOUT
    STDERR.reopen(write_pipe); $stderr = STDERR
    
    # Exec the command!
    begin
      exec(command)
    rescue => e
      print "Exec failed for command (#{command}), error: #{e}!";
    end
    
    # We will only get here if an error was raised.
    exit(1)
  end
end

.which(name, path = ) ⇒ Object

Description:

This method searches the path passed in for an executable with the name passed in. It searches in order and returns the absolute pathname of the first match.

Parameters:

name

The name of the executable to search for.

path

The path to search.

Returns:

The absolute pathname of the first match.



238
239
240
241
242
243
244
245
# File 'lib/jobmanager/system.rb', line 238

def self.which(name, path = ENV["PATH"])
  if (Pathname.new(name).absolute?())
    return (File.file?(name) && File.executable?(name)) ? name : nil
  else 
    candidates = path.split(':').map { |path| File.join(path, name)}
    return candidates.find { |file| File.file?(file) && File.executable?(file) }
  end  
end