Class: InspecPlugins::Parallelism::Runner
- Inherits:
-
Object
- Object
- InspecPlugins::Parallelism::Runner
- Defined in:
- lib/plugins/inspec-parallel/lib/inspec-parallel/runner.rb
Instance Attribute Summary collapse
-
#invocations ⇒ Object
Returns the value of attribute invocations.
-
#log_path ⇒ Object
Returns the value of attribute log_path.
-
#run_in_background ⇒ Object
Returns the value of attribute run_in_background.
-
#sub_cmd ⇒ Object
Returns the value of attribute sub_cmd.
-
#total_jobs ⇒ Object
Returns the value of attribute total_jobs.
Instance Method Summary collapse
-
#cleanup_child_processes ⇒ Object
Still in parent Loop over children and check for finished processes.
- #cleanup_daemon_process ⇒ Object
- #cleanup_empty_error_log_files ⇒ Object
- #fork_another_process ⇒ Object
-
#initialize(invocations, cli_options, sub_cmd = "exec") ⇒ Runner
constructor
A new instance of Runner.
- #initiate_background_run ⇒ Object
- #kill_child_processes ⇒ Object
- #rename_error_log_files ⇒ Object
- #run ⇒ Object
- #should_start_more_jobs? ⇒ Boolean
- #spawn_another_process ⇒ Object
- #update_ui_poll_select(target_pid = nil) ⇒ Object
- #update_ui_with_line(pid, update_line) ⇒ Object
- #wait_for_child_processes_to_die ⇒ Object
Constructor Details
#initialize(invocations, cli_options, sub_cmd = "exec") ⇒ Runner
Returns a new instance of Runner.
10 11 12 13 14 15 16 17 18 19 20 21 |
# File 'lib/plugins/inspec-parallel/lib/inspec-parallel/runner.rb', line 10 def initialize(invocations, , sub_cmd = "exec") @invocations = invocations @sub_cmd = sub_cmd @total_jobs = ["jobs"] || Concurrent.physical_processor_count @child_tracker = {} @child_tracker_persisted = {} @run_in_background = ["bg"] unless run_in_background @ui = InspecPlugins::Parallelism::SuperReporter.make(["ui"], total_jobs, invocations) end @log_path = ["log_path"] end |
Instance Attribute Details
#invocations ⇒ Object
Returns the value of attribute invocations.
8 9 10 |
# File 'lib/plugins/inspec-parallel/lib/inspec-parallel/runner.rb', line 8 def invocations @invocations end |
#log_path ⇒ Object
Returns the value of attribute log_path.
8 9 10 |
# File 'lib/plugins/inspec-parallel/lib/inspec-parallel/runner.rb', line 8 def log_path @log_path end |
#run_in_background ⇒ Object
Returns the value of attribute run_in_background.
8 9 10 |
# File 'lib/plugins/inspec-parallel/lib/inspec-parallel/runner.rb', line 8 def run_in_background @run_in_background end |
#sub_cmd ⇒ Object
Returns the value of attribute sub_cmd.
8 9 10 |
# File 'lib/plugins/inspec-parallel/lib/inspec-parallel/runner.rb', line 8 def sub_cmd @sub_cmd end |
#total_jobs ⇒ Object
Returns the value of attribute total_jobs.
8 9 10 |
# File 'lib/plugins/inspec-parallel/lib/inspec-parallel/runner.rb', line 8 def total_jobs @total_jobs end |
Instance Method Details
#cleanup_child_processes ⇒ Object
Still in parent Loop over children and check for finished processes
173 174 175 176 177 178 179 180 181 182 183 184 185 186 |
# File 'lib/plugins/inspec-parallel/lib/inspec-parallel/runner.rb', line 173 def cleanup_child_processes @child_tracker.each do |pid, info| if Process.wait(pid, Process::WNOHANG) # Expect to (probably) find EOF marker on the pipe, and close it if so update_ui_poll_select(pid) create_logs(pid, "#{Time.now.iso8601} Exit code: #{$?}\n") # child exited - status in $? @ui.child_exited(pid) unless run_in_background @child_tracker.delete pid end end end |
#cleanup_daemon_process ⇒ Object
59 60 61 62 63 64 |
# File 'lib/plugins/inspec-parallel/lib/inspec-parallel/runner.rb', line 59 def cleanup_daemon_process current_process_id = Process.pid Process.kill(9, current_process_id) # DO NOT TRY TO REFACTOR IT THIS WAY # Calling Process.kill(9,Process.pid) kills the "stopper" process itself, rather than the one it's trying to stop. end |
#cleanup_empty_error_log_files ⇒ Object
66 67 68 69 70 71 72 73 74 |
# File 'lib/plugins/inspec-parallel/lib/inspec-parallel/runner.rb', line 66 def cleanup_empty_error_log_files logs_dir_path = log_path || Dir.pwd error_files = Dir.glob("#{logs_dir_path}/logs/*.err") error_files.each do |error_file| if File.exist?(error_file) && !File.size?(error_file) File.delete(error_file) end end end |
#fork_another_process ⇒ Object
139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 |
# File 'lib/plugins/inspec-parallel/lib/inspec-parallel/runner.rb', line 139 def fork_another_process invocation = invocations.shift[:value] # Be sure to do this shift() in parent process # thing_that_reads_from_the_child, thing_that_writes_to_the_parent = IO.pipe child_reader, parent_writer = IO.pipe if (child_pid = Process.fork) # In parent with newly forked child parent_writer.close @child_tracker[child_pid] = { io: child_reader } @ui.child_forked(child_pid, invocation) unless run_in_background else # In child child_reader.close # replace stdout with writer $stdout = parent_writer create_logs(Process.pid, nil, $stderr) begin create_logs( Process.pid, "#{Time.now.iso8601} Start Time: #{Time.now}\n#{Time.now.iso8601} Arguments: #{invocation}\n" ) runner_invocation(invocation) rescue StandardError => e $stderr.puts "#{Time.now.iso8601} Error Message: #{e.}" $stderr.puts "#{Time.now.iso8601} Error Backtrace: #{e.backtrace}" end # should be unreachable but child MUST exit exit(42) end end |
#initiate_background_run ⇒ Object
51 52 53 54 55 56 57 |
# File 'lib/plugins/inspec-parallel/lib/inspec-parallel/runner.rb', line 51 def initiate_background_run if Inspec.locally_windows? Inspec::UI.new.exit(:usage_error) else Process.daemon(true, true) end end |
#kill_child_processes ⇒ Object
76 77 78 79 80 81 82 83 84 |
# File 'lib/plugins/inspec-parallel/lib/inspec-parallel/runner.rb', line 76 def kill_child_processes @child_tracker.each do |pid, info| Process.kill("SIGKILL", pid) rescue Exception => e $stderr.puts "Error while shutting down process #{pid}: #{e.}" end # Waiting for child processes to die after they have been killed wait_for_child_processes_to_die end |
#rename_error_log_files ⇒ Object
100 101 102 103 104 |
# File 'lib/plugins/inspec-parallel/lib/inspec-parallel/runner.rb', line 100 def rename_error_log_files @child_tracker_persisted.each do |pid, info| rename_error_log(info[:error_log_file], pid) end end |
#run ⇒ Object
23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 |
# File 'lib/plugins/inspec-parallel/lib/inspec-parallel/runner.rb', line 23 def run initiate_background_run if run_in_background # running a process as daemon changes parent process pid original_stdout_stream = ChefLicensing::Config.output until invocations.empty? && @child_tracker.empty? # Changing output to STDERR to avoid the output interruption between runs ChefLicensing::Config.output = STDERR while should_start_more_jobs? if Inspec.locally_windows? spawn_another_process else fork_another_process end end update_ui_poll_select cleanup_child_processes sleep 0.1 end # Reset output to the original STDOUT stream as a safe measure. ChefLicensing::Config.output = original_stdout_stream # Requires renaming operations on windows only # Do Rename and delete operations after all child processes have exited successfully rename_error_log_files if Inspec.locally_windows? cleanup_empty_error_log_files cleanup_daemon_process if run_in_background end |
#should_start_more_jobs? ⇒ Boolean
106 107 108 |
# File 'lib/plugins/inspec-parallel/lib/inspec-parallel/runner.rb', line 106 def should_start_more_jobs? @child_tracker.length < total_jobs && !invocations.empty? end |
#spawn_another_process ⇒ Object
110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 |
# File 'lib/plugins/inspec-parallel/lib/inspec-parallel/runner.rb', line 110 def spawn_another_process invocation = invocations.shift[:value] child_reader, parent_writer = IO.pipe begin logs_dir_path = log_path || Dir.pwd log_dir = File.join(logs_dir_path, "logs") FileUtils.mkdir_p(log_dir) error_log_file = File.open("#{log_dir}/#{Time.now.nsec}.err", "a+") cmd = "#{$0} #{sub_cmd} #{invocation}" log_msg = "#{Time.now.iso8601} Start Time: #{Time.now}\n#{Time.now.iso8601} Arguments: #{invocation}\n" child_pid = Process.spawn(cmd, out: parent_writer, err: error_log_file.path) # Logging create_logs(child_pid, log_msg) @child_tracker[child_pid] = { io: child_reader } # This is used to rename error log files after all child processes are exited @child_tracker_persisted[child_pid] = { error_log_file: error_log_file } @ui.child_spawned(child_pid, invocation) # Close the file to unlock the error log files opened by processes error_log_file.close rescue StandardError => e $stderr.puts "#{Time.now.iso8601} Error Message: #{e.}" $stderr.puts "#{Time.now.iso8601} Error Backtrace: #{e.backtrace}" end end |
#update_ui_poll_select(target_pid = nil) ⇒ Object
188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 |
# File 'lib/plugins/inspec-parallel/lib/inspec-parallel/runner.rb', line 188 def update_ui_poll_select(target_pid = nil) # Focus on one pid's pipe if specified, otherwise poll all pipes pipes_for_reading = target_pid ? [ @child_tracker[target_pid][:io] ] : @child_tracker.values.map { |i| i[:io] } # Next line is due to a race between the close() and the wait()... shouldn't need it, but it fixes the race. pipes_for_reading.reject!(&:closed?) ready_pipes = IO.select(pipes_for_reading, [], [], 0.1) return unless ready_pipes ready_pipes[0].each do |pipe_ready_for_reading| # If we weren't provided a PID, hackishly look up the pid from the matching IO. pid = target_pid || @child_tracker.keys.detect { |p| @child_tracker[p][:io] == pipe_ready_for_reading } begin while (update_line = pipe_ready_for_reading.readline) && !pipe_ready_for_reading.closed? if update_line =~ /EOF_MARKER/ pipe_ready_for_reading.close break elsif update_line =~ /WARN/ || update_line =~ /ERROR/ || update_line =~ /INFO/ create_logs( pid, "#{Time.now.iso8601} Extra log: #{update_line}\n" ) break end update_ui_with_line(pid, update_line) unless run_in_background # Only pull one line if we are doing normal updates; slurp the whole file # if we are doing a final pull on a targeted PID break unless target_pid end rescue EOFError # On unix, readline throws an EOFError when we hit the end. On Windows, nothing apparently happens. pipe_ready_for_reading.close next end end # TODO: loop over ready_pipes[2] and handle errors? end |
#update_ui_with_line(pid, update_line) ⇒ Object
225 226 227 |
# File 'lib/plugins/inspec-parallel/lib/inspec-parallel/runner.rb', line 225 def update_ui_with_line(pid, update_line) @ui.child_status_update_line(pid, update_line) end |
#wait_for_child_processes_to_die ⇒ Object
86 87 88 89 90 91 92 93 94 95 96 97 98 |
# File 'lib/plugins/inspec-parallel/lib/inspec-parallel/runner.rb', line 86 def wait_for_child_processes_to_die until @child_tracker.empty? begin exited_pid = Process.waitpid(-1, Process::WNOHANG) @child_tracker.delete exited_pid if exited_pid && exited_pid > 0 sleep 1 rescue Errno::ECHILD Inspec::Log.info "Processes shutdown complete!" rescue Exception => e Inspec::Log.debug "Error while waiting for child processes to shutdown: #{e.}" end end end |