Class: InspecPlugins::Parallelism::Runner

Inherits:
Object
  • Object
show all
Defined in:
lib/plugins/inspec-parallel/lib/inspec-parallel/runner.rb

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(invocations, cli_options, sub_cmd = "exec") ⇒ Runner

Returns a new instance of Runner.



10
11
12
13
14
15
16
17
18
19
20
21
# File 'lib/plugins/inspec-parallel/lib/inspec-parallel/runner.rb', line 10

def initialize(invocations, cli_options, sub_cmd = "exec")
  @invocations = invocations
  @sub_cmd = sub_cmd
  @total_jobs = cli_options["jobs"] || Concurrent.physical_processor_count
  @child_tracker = {}
  @child_tracker_persisted = {}
  @run_in_background = cli_options["bg"]
  unless run_in_background
    @ui = InspecPlugins::Parallelism::SuperReporter.make(cli_options["ui"], total_jobs, invocations)
  end
  @log_path = cli_options["log_path"]
end

Instance Attribute Details

#invocationsObject

Returns the value of attribute invocations.



8
9
10
# File 'lib/plugins/inspec-parallel/lib/inspec-parallel/runner.rb', line 8

def invocations
  @invocations
end

#log_pathObject

Returns the value of attribute log_path.



8
9
10
# File 'lib/plugins/inspec-parallel/lib/inspec-parallel/runner.rb', line 8

def log_path
  @log_path
end

#run_in_backgroundObject

Returns the value of attribute run_in_background.



8
9
10
# File 'lib/plugins/inspec-parallel/lib/inspec-parallel/runner.rb', line 8

def run_in_background
  @run_in_background
end

#sub_cmdObject

Returns the value of attribute sub_cmd.



8
9
10
# File 'lib/plugins/inspec-parallel/lib/inspec-parallel/runner.rb', line 8

def sub_cmd
  @sub_cmd
end

#total_jobsObject

Returns the value of attribute total_jobs.



8
9
10
# File 'lib/plugins/inspec-parallel/lib/inspec-parallel/runner.rb', line 8

def total_jobs
  @total_jobs
end

Instance Method Details

#cleanup_child_processesObject

Still in parent Loop over children and check for finished processes



173
174
175
176
177
178
179
180
181
182
183
184
185
186
# File 'lib/plugins/inspec-parallel/lib/inspec-parallel/runner.rb', line 173

def cleanup_child_processes
  @child_tracker.each do |pid, info|
    if Process.wait(pid, Process::WNOHANG)
      # Expect to (probably) find EOF marker on the pipe, and close it if so
      update_ui_poll_select(pid)

      create_logs(pid, "#{Time.now.iso8601} Exit code: #{$?}\n")

      # child exited - status in $?
      @ui.child_exited(pid) unless run_in_background
      @child_tracker.delete pid
    end
  end
end

#cleanup_daemon_processObject



59
60
61
62
63
64
# File 'lib/plugins/inspec-parallel/lib/inspec-parallel/runner.rb', line 59

def cleanup_daemon_process
  current_process_id = Process.pid
  Process.kill(9, current_process_id)
  # DO NOT TRY TO REFACTOR IT THIS WAY
  # Calling Process.kill(9,Process.pid) kills the "stopper" process itself, rather than the one it's trying to stop.
end

#cleanup_empty_error_log_filesObject



66
67
68
69
70
71
72
73
74
# File 'lib/plugins/inspec-parallel/lib/inspec-parallel/runner.rb', line 66

def cleanup_empty_error_log_files
  logs_dir_path = log_path || Dir.pwd
  error_files = Dir.glob("#{logs_dir_path}/logs/*.err")
  error_files.each do |error_file|
    if File.exist?(error_file) && !File.size?(error_file)
      File.delete(error_file)
    end
  end
end

#fork_another_processObject



139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
# File 'lib/plugins/inspec-parallel/lib/inspec-parallel/runner.rb', line 139

def fork_another_process
  invocation = invocations.shift[:value] # Be sure to do this shift() in parent process
  # thing_that_reads_from_the_child, thing_that_writes_to_the_parent = IO.pipe
  child_reader, parent_writer = IO.pipe
  if (child_pid = Process.fork)
    # In parent with newly forked child
    parent_writer.close
    @child_tracker[child_pid] = { io: child_reader }
    @ui.child_forked(child_pid, invocation) unless run_in_background
  else
    # In child
    child_reader.close
    # replace stdout with writer
    $stdout = parent_writer
    create_logs(Process.pid, nil, $stderr)

    begin
      create_logs(
        Process.pid,
        "#{Time.now.iso8601} Start Time: #{Time.now}\n#{Time.now.iso8601} Arguments: #{invocation}\n"
      )
      runner_invocation(invocation)
    rescue StandardError => e
      $stderr.puts "#{Time.now.iso8601} Error Message: #{e.message}"
      $stderr.puts "#{Time.now.iso8601} Error Backtrace: #{e.backtrace}"
    end

    # should be unreachable but child MUST exit
    exit(42)
  end
end

#initiate_background_runObject



51
52
53
54
55
56
57
# File 'lib/plugins/inspec-parallel/lib/inspec-parallel/runner.rb', line 51

def initiate_background_run
  if Inspec.locally_windows?
    Inspec::UI.new.exit(:usage_error)
  else
    Process.daemon(true, true)
  end
end

#kill_child_processesObject



76
77
78
79
80
81
82
83
84
# File 'lib/plugins/inspec-parallel/lib/inspec-parallel/runner.rb', line 76

def kill_child_processes
  @child_tracker.each do |pid, info|
    Process.kill("SIGKILL", pid)
  rescue Exception => e
    $stderr.puts "Error while shutting down process #{pid}: #{e.message}"
  end
  # Waiting for child processes to die after they have been killed
  wait_for_child_processes_to_die
end

#rename_error_log_filesObject



100
101
102
103
104
# File 'lib/plugins/inspec-parallel/lib/inspec-parallel/runner.rb', line 100

def rename_error_log_files
  @child_tracker_persisted.each do |pid, info|
    rename_error_log(info[:error_log_file], pid)
  end
end

#runObject



23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
# File 'lib/plugins/inspec-parallel/lib/inspec-parallel/runner.rb', line 23

def run
  initiate_background_run if run_in_background # running a process as daemon changes parent process pid
  original_stdout_stream = ChefLicensing::Config.output
  until invocations.empty? && @child_tracker.empty?
    # Changing output to STDERR to avoid the output interruption between runs
    ChefLicensing::Config.output = STDERR
    while should_start_more_jobs?
      if Inspec.locally_windows?
        spawn_another_process
      else
        fork_another_process
      end
    end

    update_ui_poll_select
    cleanup_child_processes
    sleep 0.1
  end
  # Reset output to the original STDOUT stream as a safe measure.
  ChefLicensing::Config.output = original_stdout_stream

  # Requires renaming operations on windows only
  # Do Rename and delete operations after all child processes have exited successfully
  rename_error_log_files if Inspec.locally_windows?
  cleanup_empty_error_log_files
  cleanup_daemon_process if run_in_background
end

#should_start_more_jobs?Boolean

Returns:

  • (Boolean)


106
107
108
# File 'lib/plugins/inspec-parallel/lib/inspec-parallel/runner.rb', line 106

def should_start_more_jobs?
  @child_tracker.length < total_jobs && !invocations.empty?
end

#spawn_another_processObject



110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
# File 'lib/plugins/inspec-parallel/lib/inspec-parallel/runner.rb', line 110

def spawn_another_process
  invocation = invocations.shift[:value]

  child_reader, parent_writer = IO.pipe
  begin
    logs_dir_path = log_path || Dir.pwd
    log_dir = File.join(logs_dir_path, "logs")
    FileUtils.mkdir_p(log_dir)
    error_log_file = File.open("#{log_dir}/#{Time.now.nsec}.err", "a+")
    cmd = "#{$0} #{sub_cmd} #{invocation}"
    log_msg = "#{Time.now.iso8601} Start Time: #{Time.now}\n#{Time.now.iso8601} Arguments: #{invocation}\n"
    child_pid = Process.spawn(cmd, out: parent_writer, err: error_log_file.path)

    # Logging
    create_logs(child_pid, log_msg)
    @child_tracker[child_pid] = { io: child_reader }

    # This is used to rename error log files after all child processes are exited
    @child_tracker_persisted[child_pid] = { error_log_file: error_log_file }
    @ui.child_spawned(child_pid, invocation)

    # Close the file to unlock the error log files opened by processes
    error_log_file.close
  rescue StandardError => e
    $stderr.puts "#{Time.now.iso8601} Error Message: #{e.message}"
    $stderr.puts "#{Time.now.iso8601} Error Backtrace: #{e.backtrace}"
  end
end

#update_ui_poll_select(target_pid = nil) ⇒ Object



188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
# File 'lib/plugins/inspec-parallel/lib/inspec-parallel/runner.rb', line 188

def update_ui_poll_select(target_pid = nil)
  # Focus on one pid's pipe if specified, otherwise poll all pipes
  pipes_for_reading = target_pid ? [ @child_tracker[target_pid][:io] ] : @child_tracker.values.map { |i| i[:io] }
  # Next line is due to a race between the close() and the wait()... shouldn't need it, but it fixes the race.
  pipes_for_reading.reject!(&:closed?)
  ready_pipes = IO.select(pipes_for_reading, [], [], 0.1)
  return unless ready_pipes

  ready_pipes[0].each do |pipe_ready_for_reading|
    # If we weren't provided a PID, hackishly look up the pid from the matching IO.
    pid = target_pid || @child_tracker.keys.detect { |p| @child_tracker[p][:io] == pipe_ready_for_reading }
    begin
      while (update_line = pipe_ready_for_reading.readline) && !pipe_ready_for_reading.closed?
        if update_line =~ /EOF_MARKER/
          pipe_ready_for_reading.close
          break
        elsif update_line =~ /WARN/ || update_line =~ /ERROR/ || update_line =~ /INFO/
          create_logs(
            pid,
            "#{Time.now.iso8601} Extra log: #{update_line}\n"
          )
          break
        end
        update_ui_with_line(pid, update_line) unless run_in_background
        # Only pull one line if we are doing normal updates; slurp the whole file
        # if we are doing a final pull on a targeted PID
        break unless target_pid
      end
    rescue EOFError
      # On unix, readline throws an EOFError when we hit the end. On Windows, nothing apparently happens.
      pipe_ready_for_reading.close
      next
    end
  end
  # TODO: loop over ready_pipes[2] and handle errors?
end

#update_ui_with_line(pid, update_line) ⇒ Object



225
226
227
# File 'lib/plugins/inspec-parallel/lib/inspec-parallel/runner.rb', line 225

def update_ui_with_line(pid, update_line)
  @ui.child_status_update_line(pid, update_line)
end

#wait_for_child_processes_to_dieObject



86
87
88
89
90
91
92
93
94
95
96
97
98
# File 'lib/plugins/inspec-parallel/lib/inspec-parallel/runner.rb', line 86

def wait_for_child_processes_to_die
  until @child_tracker.empty?
    begin
      exited_pid = Process.waitpid(-1, Process::WNOHANG)
      @child_tracker.delete exited_pid if exited_pid && exited_pid > 0
      sleep 1
    rescue Errno::ECHILD
      Inspec::Log.info "Processes shutdown complete!"
    rescue Exception => e
      Inspec::Log.debug "Error while waiting for child processes to shutdown: #{e.message}"
    end
  end
end