Module: Patriot::Util::System

Included in:
Command::ShCommand
Defined in:
lib/patriot/util/system.rb

Overview

module for interaction with OS

Defined Under Namespace

Classes: ExternalCommandException

Constant Summary collapse

STDOUT_SUFFIX =

suffix of file where stdout is written

".stdout"
STDERR_SUFFIX =

suffix of file where stderr is written

".stderr"
PATRIOT_TMP_DIR_KEY =

configuration key for tmp directory where stdout/stderr are written

"patriot.tmp.dir"
DEFAULT_PATRIOT_TMP_DIR =

defatul path to the tmp directory

"/tmp/patriot-workflow-scheduler"
MAX_ERROR_MSG_SIZE_KEY =

max size of error message included in exceptions

"patriot.max.error.size"
DEFAULT_MAX_ERROR_MSG_SIZE =

defaut max size of error message

256

Instance Method Summary collapse

Instance Method Details

#do_fork(cmd, dt, ts, tmp_dir = DEFAULT_PATRIOT_TMP_DIR) ⇒ Object

fork and execute the command (visible for test)



35
36
37
38
39
40
41
42
43
44
45
46
# File 'lib/patriot/util/system.rb', line 35

def do_fork(cmd, dt, ts, tmp_dir = DEFAULT_PATRIOT_TMP_DIR)
  cid = fork do 
    tmpdir = tmp_dir($$, dt, ts, tmp_dir)
    FileUtils.mkdir_p(tmpdir, {:mode => 0777})
    std_out = File.join(tmpdir, "#{$$.to_i}#{STDOUT_SUFFIX}")
    std_err = File.join(tmpdir, "#{$$.to_i}#{STDERR_SUFFIX}")
    STDOUT.reopen(std_out,"w")
    STDERR.reopen(std_err,"w")
    exec(cmd) 
  end
  return cid
end

#execute_command(command) { ... } ⇒ Object

execute command on OS

Parameters:

  • command (String)

    command to be executed

Yields:

  • block for error handling



51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
# File 'lib/patriot/util/system.rb', line 51

def execute_command(command, &blk)
  so, se = nil

  time_obj = Time.now
  ts       = time_obj.to_i
  dt       = time_obj.strftime("%Y-%m-%d")

  tmp_dir_base  = @config.get(PATRIOT_TMP_DIR_KEY, DEFAULT_PATRIOT_TMP_DIR)

  # the forked variable is used for checking whether fork invocation hangs.
  #  (due to https://redmine.ruby-lang.org/issues/5240 ?)
  forked = false
  until forked
    cid = do_fork(command, dt, ts, tmp_dir_base)
    tmpdir = tmp_dir(cid, dt, ts, tmp_dir_base)
    i = 0
    # If fork hangs, output directory would not be created.
    # wait at most 5 seconds for the directory created.
    until forked || i > 5
      sleep(1)
      forked = File.exist?(tmpdir)
      i = i+1
    end
    # fork hanged, kill the hanged process.
    unless forked
      # check whether cid is id of child process to avoid to kill unrelated processes
      begin
        if Process.waitpid(cid, Process::WNOHANG).nil?
          @logger.warn("forked process :#{cid} hanged. kill #{cid}")
          Process.kill("KILL", cid)
          @logger.warn("SIGKILL sent to #{cid}")
          Process.waitpid(cid)
          @logger.warn("#{cid} is killed")
        else
          raise ExternalCommandException, "#{cid} is not a child of this"
        end
      rescue Exception => e
        @logger.warn "failed to kill hanged process #{cid}"
        raise e
      end
    end
  end

  @logger.info "executing #{command}: results stored in #{tmpdir}" 
  pid, status = Process.waitpid2(cid)
  so = File.join(tmpdir, "#{cid.to_i}#{STDOUT_SUFFIX}")
  se = File.join(tmpdir, "#{cid.to_i}#{STDERR_SUFFIX}")

  @logger.info "#{command} is finished" 
  return so if status.exitstatus == 0
  @logger.warn "#{command} end with exit status #{status.exitstatus}" 
  if block_given?
    yield(status, so, se)
  else
    max_err_size = @config.get(MAX_ERROR_MSG_SIZE_KEY, DEFAULT_MAX_ERROR_MSG_SIZE)
    err_size = File.stat(se).size
    err_msg =  "#{command}\n#{se} :"
    if err_size < max_err_size
      File.open(se){|f| err_msg = "#{err_msg}\n#{f.read}"}
    else
      err_msg = "#{err_msg} \n the size of stderr is #{err_size} (> #{max_err_size}" 
    end
    raise ExternalCommandException, err_msg
  end
end

#tmp_dir(pid, dt, ts, tmp_dir = DEFAULT_PATRIOT_TMP_DIR) ⇒ Object

get path to tmp directory (visible for test)



26
27
28
29
30
31
# File 'lib/patriot/util/system.rb', line 26

def tmp_dir(pid, dt, ts, tmp_dir = DEFAULT_PATRIOT_TMP_DIR)
  prefix = "p#{pid.to_s}"
  prefix = "j#{Thread.current[Patriot::Worker::JOB_ID_IN_EXECUTION]}" if Thread.current[Patriot::Worker::JOB_ID_IN_EXECUTION] 
  ts_exp = Time.at(ts).strftime("%Y%m%d_%H%M%S")
  return File.join(tmp_dir, dt, "#{prefix}_#{ts_exp}")
end