Class: Proxy::Dynflow::ProcessManager

Inherits:
Object
  • Object
show all
Defined in:
lib/smart_proxy_dynflow/process_manager.rb

Overview

An abstraction for managing local processes.

It can be used to:

  • spawn a local process

  • track its lifecycle

  • communicate with it through its standard input, output and error

  • step through the execution one event at a time or start the child process and wait until it finishes

Examples:

Run date command and collect its output

pm = ProcessManager.new('date')
pm.run!
pm.status #=> 0
pm.stdout.to_s.chomp #=> "Thu Feb  3 04:27:42 PM CET 2022"

Run a shell loop, outputting all the lines it generates

pm = ProcessManager.new(['/bin/sh', '-c', 'for i in 1 2 3; do echo $i; sleep 1; done'])
pm.on_stdout { |data| puts data; '' }
pm.run!
#=> 1
#=> 2
#=> 3

Run bc (calculator) interactively and count down from 10 to 0

pm = ProcessManager.new('bc')
pm.on_stdout do |data|
  if data.match?(/^\d+/)
    n = data.to_i
    if n.zero?
      pm.stdin.to_io.close
    else
      pm.stdin.add_data("#{n} - 1\n")
    end
  end
  data
end
pm.stdin.add_data("10\n")
pm.run!
pm.stdout.to_s.lines #=. ["10\n", "9\n", "8\n", "7\n", "6\n", "5\n", "4\n", "3\n", "2\n", "1\n", "0\n"]

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(command) ⇒ ProcessManager

Returns a new instance of ProcessManager.

Parameters:

  • command (String, [String], [Hash, String])

    A command to run in one of the forms accepted by Kernel.spawn



55
56
57
58
59
60
# File 'lib/smart_proxy_dynflow/process_manager.rb', line 55

def initialize(command)
  @command = command
  @stdin  = IOBuffer.new(nil)
  @stdout = IOBuffer.new(nil)
  @stderr = IOBuffer.new(nil)
end

Instance Attribute Details

#pidnil, Integer (readonly)

Process id of the child process, nil if the process was not started yet, -1 if the process could not be started

Returns:

  • (nil, Integer)

    the current value of pid



51
52
53
# File 'lib/smart_proxy_dynflow/process_manager.rb', line 51

def pid
  @pid
end

#statusnil, Integer (readonly)

Exit status of the child process. nil if the child process has not finished yet, 255 if the process could not be started

Returns:

  • (nil, Integer)

    the current value of status



51
52
53
# File 'lib/smart_proxy_dynflow/process_manager.rb', line 51

def status
  @status
end

#stderrProxy::Dynflow::IOBuffer (readonly)

IOBuffer buffering reads from child process’ standard error

Returns:



51
52
53
# File 'lib/smart_proxy_dynflow/process_manager.rb', line 51

def stderr
  @stderr
end

#stdinProxy::Dynflow::IOBuffer (readonly)

IOBuffer buffering writes to child process’ standard input

Returns:



51
52
53
# File 'lib/smart_proxy_dynflow/process_manager.rb', line 51

def stdin
  @stdin
end

#stdoutProxy::Dynflow::IOBuffer (readonly)

IOBuffer buffering reads from child process’ standard output

Returns:



51
52
53
# File 'lib/smart_proxy_dynflow/process_manager.rb', line 51

def stdout
  @stdout
end

Instance Method Details

#closevoid

This method returns an undefined value.

Makes the process manager close all the pipes it may have opened to communicate with the child process



159
160
161
# File 'lib/smart_proxy_dynflow/process_manager.rb', line 159

def close
  [@stdin, @stdout, @stderr].each(&:close)
end

#done?true, false

Determines whether the child process of the process manager already finished

Returns:

  • (true, false)

    whether the child process of the process manager already finished



104
105
106
# File 'lib/smart_proxy_dynflow/process_manager.rb', line 104

def done?
  started? && !status.nil?
end

#on_stderr(&block) ⇒ void

This method returns an undefined value.

Sets block to be executed each time data is read from child process’ standard error



152
153
154
# File 'lib/smart_proxy_dynflow/process_manager.rb', line 152

def on_stderr(&block)
  @stderr.on_data(&block)
end

#on_stdout(&block) ⇒ void

This method returns an undefined value.

Sets block to be executed each time data is read from child process’ standard output



145
146
147
# File 'lib/smart_proxy_dynflow/process_manager.rb', line 145

def on_stdout(&block)
  @stdout.on_data(&block)
end

#process(timeout: nil) ⇒ void

This method returns an undefined value.

Runs a single iteration of the manager’s processing loop. It waits until either:

  • data is available in pipes connected to the child process’ standard output or error

  • there is pending data to be written and the pipe connected to the child process’ standard input is writable

  • a timeout is reached

After the wait, all pending data is read and written.

If all the pipes connected to the child process are closed, it marks the execution as complete and performs cleanup.

Parameters:

  • timeout (nil, Numeric) (defaults to: nil)

    controls how long this call should wait for data to become available. Waits indefinitely if nil.



120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
# File 'lib/smart_proxy_dynflow/process_manager.rb', line 120

def process(timeout: nil)
  raise 'Cannot process until the manager is started' unless started?

  writers = [@stdin].reject { |buf| buf.empty? || buf.closed? }
  readers = [@stdout, @stderr].reject(&:closed?)

  if readers.empty? && writers.empty?
    finish
    return
  end

  # Even though the FDs are still open, the child might have exited already
  pid, status = Process.waitpid2(@pid, Process::WNOHANG)
  timeout = 1 if pid

  ready_readers, ready_writers = IO.select(readers, writers, nil, timeout)
  (ready_readers || []).each(&:read_available!)
  (ready_writers || []).each(&:write_available!)

  finish(status) if pid
end

#run!ProcessManager

Starts the process manager and runs it until it finishes

Returns:

  • (ProcessManager)

    the process manager itself to allow method chaining



65
66
67
68
69
# File 'lib/smart_proxy_dynflow/process_manager.rb', line 65

def run!
  start! unless started?
  process until done?
  self
end

#start!void

This method returns an undefined value.

Starts the child process. It creates 3 pipes for communicating with the child process and the forks it. The process manager is considered done if the child process cannot be started.



76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
# File 'lib/smart_proxy_dynflow/process_manager.rb', line 76

def start!
  in_read,  in_write  = IO.pipe
  out_read, out_write = IO.pipe
  err_read, err_write = IO.pipe

  @stdin.io  = in_write
  @stdout.io = out_read
  @stderr.io = err_read

  @pid = spawn(*@command, :in => in_read, :out => out_write, :err => err_write)
  [in_read, out_write, err_write].each(&:close)
rescue Errno::ENOENT => e
  [in_read, in_write, out_read, out_write, err_read, err_write].each(&:close)
  @pid = -1
  @status = 255
  @stderr.add_data(e.message)
end

#started?true, false

Determines whether the process manager already forked off its child process

Returns:

  • (true, false)

    whether the process manager already forked off its child process



97
98
99
# File 'lib/smart_proxy_dynflow/process_manager.rb', line 97

def started?
  !pid.nil?
end