Class: ProcessExecuter::MonitoredPipe

Inherits:
Object
  • Object
show all
Includes:
TrackOpenInstances
Defined in:
lib/process_executer/monitored_pipe.rb

Overview

Acts as a pipe that writes the data written to it to one or more destinations

MonitoredPipe was created to expand the output redirection options for Process.spawn and methods derived from it within the ProcessExecuter module.

This class's initializer accepts any redirection destination supported by Process.spawn (this is the value part of the file redirection option described in the File Redirection section of Process.spawn.

In addition to the standard redirection destinations, MonitoredPipe also supports these additional types of destinations:

  • Arbitrary Writers

You can redirect subprocess output to any Ruby object that implements the #write method. This is particularly useful for:

- capturing command output in in-memory buffers like `StringIO`
- sending command output to custom logging objects that do not have a file
  descriptor
- processing with a streaming parser to parse and process command output as
  the command is running
  • Multiple Destinations

MonitoredPipe supports duplicating (or "teeing") output to multiple destinations simultaneously. This is achieved by providing a redirection destination in the form [:tee, destination1, destination2, ...], where each destination can be any value that MonitoredPipe itself supports (including another tee or MonitoredPipe).

When a new MonitoredPipe is created, a pipe is created (via IO.pipe) and a thread is created to read data written to the pipe. As data is read from the pipe, it is written to the destination provided in the MonitoredPipe initializer.

If the destination raises an exception, the monitoring thread will exit, the pipe will be closed, and the exception will be saved in #exception.

⚠️ WARNING

#close must be called to ensure that (1) the pipe is closed, (2) all data is read from the pipe and written to the destination, and (3) the monitoring thread is killed.

Examples:

Collect pipe data into a StringIO object

pipe_data = StringIO.new
begin
  pipe = ProcessExecuter::MonitoredPipe.new(pipe_data)
  pipe.write("Hello World")
ensure
  pipe.close
end
pipe_data.string #=> "Hello World"

Collect pipe data into a string AND a file

pipe_data_string = StringIO.new
pipe_data_file = File.open("pipe_data.txt", "w")
begin
  pipe = ProcessExecuter::MonitoredPipe.new([:tee, pipe_data_string, pipe_data_file])
  pipe.write("Hello World")
ensure
  pipe.close
end
pipe_data_string.string #=> "Hello World"
# It is your responsibility to close the file you opened
pipe_data_file.close
File.read("pipe_data.txt") #=> "Hello World"

Using a MonitoredPipe with Process.spawn

stdout_buffer = StringIO.new
begin
  stdout_pipe = ProcessExecuter::MonitoredPipe.new(stdout_buffer)
  pid = Process.spawn('echo Hello World', out: stdout_pipe)
  _waited_pid, status = Process.wait2(pid)
ensure
  stdout_pipe.close
end
stdout_buffer.string #=> "Hello World\n"

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(redirection_destination, chunk_size: 100_000) ⇒ MonitoredPipe

Create a new monitored pipe

Creates an IO.pipe and starts a monitoring thread to read data written to the pipe.

Examples:

redirection_destination = StringIO.new
pipe = ProcessExecuter::MonitoredPipe.new(redirection_destination)

Parameters:

  • redirection_destination (Object)

    as data is read from the pipe, it is written to this destination

    Accepts any redirection destination supported by Process.spawn. This is the value part of the file redirection option described in the File Redirection section of Process.spawn.

    In addition to the standard redirection destinations, MonitoredPipe also accepts (1) another monitored pipe, (2) any object that implements a #write method and (3) an array in the form [:tee, destination1, destination2, ...] where each destination can be any value that MonitoredPipe itself supports (including another tee or MonitoredPipe).

  • chunk_size (Integer) (defaults to: 100_000)

    the size of the chunks to read from the pipe



122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
# File 'lib/process_executer/monitored_pipe.rb', line 122

def initialize(redirection_destination, chunk_size: 100_000)
  @destination = Destinations.factory(redirection_destination)

  assert_destination_is_compatible_with_monitored_pipe

  @mutex = Mutex.new
  @condition_variable = ConditionVariable.new
  @chunk_size = chunk_size
  @pipe_reader, @pipe_writer = IO.pipe

  # Set the encoding of the pipe reader to ASCII_8BIT. This is not strictly
  # necessary since read_nonblock always returns a String where encoding is
  # Encoding::ASCII_8BIT, but it is a good practice to explicitly set the
  # encoding.
  pipe_reader.set_encoding(Encoding::ASCII_8BIT)

  @state = :open
  @thread = start_monitoring_thread

  self.class.add_open_instance(self)
end

Instance Attribute Details

#chunk_sizeInteger (readonly)

The size of the chunks to read from the pipe

Examples:

require 'stringio'
data_collector = StringIO.new
pipe = ProcessExecuter::MonitoredPipe.new(data_collector)
pipe.chunk_size #=> 100_000

Returns:

  • (Integer)

    the size of the chunks to read from the pipe



247
248
249
# File 'lib/process_executer/monitored_pipe.rb', line 247

def chunk_size
  @chunk_size
end

#destinationProcessExecuter::Destinations::DestinationBase (readonly)

The redirection destination to write data that is read from the pipe

Examples:

require 'stringio'
data_collector = StringIO.new
pipe = ProcessExecuter::MonitoredPipe.new(data_collector)
pipe.destination #=> #<ProcessExecuter::Destinations::Writer>

Returns:



261
262
263
# File 'lib/process_executer/monitored_pipe.rb', line 261

def destination
  @destination
end

#exceptionException? (readonly)

The exception raised by a destination

If an exception is raised by a destination, it is stored here. Otherwise, it is nil.

Examples:

pipe.exception #=> nil

Returns:

  • (Exception, nil)

    the exception raised by a destination or nil if no exception was raised



294
295
296
# File 'lib/process_executer/monitored_pipe.rb', line 294

def exception
  @exception
end

#filenoInteger (readonly)

The file descriptor for the write end of the pipe

Examples:

require 'stringio'
data_collector = StringIO.new
pipe = ProcessExecuter::MonitoredPipe.new(data_collector)
pipe.fileno == pipe.to_io.fileno #=> true

Returns:

  • (Integer)

    the file descriptor for the write end of the pipe



205
206
207
# File 'lib/process_executer/monitored_pipe.rb', line 205

def fileno
  pipe_writer.fileno
end

#pipe_readerIO (readonly)

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

The read end of the pipe

Examples:

pipe = ProcessExecuter::MonitoredPipe.new($stdout)
pipe.pipe_reader #=> #<IO:fd 11>

Returns:

  • (IO)


324
325
326
# File 'lib/process_executer/monitored_pipe.rb', line 324

def pipe_reader
  @pipe_reader
end

#pipe_writerIO (readonly)

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

The write end of the pipe

Examples:

pipe = ProcessExecuter::MonitoredPipe.new($stdout)
pipe.pipe_writer #=> #<IO:fd 12>

Returns:

  • (IO)

    the write end of the pipe



338
339
340
# File 'lib/process_executer/monitored_pipe.rb', line 338

def pipe_writer
  @pipe_writer
end

#stateSymbol (readonly)

The state of the pipe

Must be either :open, :closing, or :closed

  • :open - the pipe is open and data can be written to it
  • :closing - the pipe is being closed and data can no longer be written to it
  • :closed - the pipe is closed and data can no longer be written to it

Examples:

pipe = ProcessExecuter::MonitoredPipe.new($stdout)
pipe.state #=> :open
pipe.close
pipe.state #=> :closed

Returns:

  • (Symbol)

    the state of the pipe



281
282
283
# File 'lib/process_executer/monitored_pipe.rb', line 281

def state
  @state
end

#threadThread (readonly)

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

The thread that monitors the pipe

Examples:

require 'stringio'
data_collector = StringIO.new
pipe = ProcessExecuter::MonitoredPipe.new(data_collector)
pipe.thread #=> #<Thread:0x00007f8b1a0b0e00>

Returns:

  • (Thread)


310
311
312
# File 'lib/process_executer/monitored_pipe.rb', line 310

def thread
  @thread
end

Instance Method Details

#close

This method returns an undefined value.

Set the state to :closing and wait for the state to be set to :closed

The monitoring thread will see that the state has changed and will close the pipe.

Examples:

data_collector = StringIO.new
pipe = ProcessExecuter::MonitoredPipe.new(data_collector)
pipe.state #=> :open
pipe.write('Hello World')
pipe.close
pipe.state #=> :closed
data_collector.string #=> "Hello World"


159
160
161
162
163
164
165
166
167
168
169
170
# File 'lib/process_executer/monitored_pipe.rb', line 159

def close
  mutex.synchronize do
    if state == :open
      @state = :closing
      condition_variable.wait(mutex) while @state != :closed
    end
  end

  thread.join
  destination.close
  self.class.remove_open_instance(self)
end

#to_ioIO

Return the write end of the pipe so that data can be written to it

Data written to this end of the pipe will be read by the monitor thread and written to the destination.

This is so we can provide a MonitoredPipe to Process.spawn as a FD

Examples:

require 'stringio'
data_collector = StringIO.new
pipe = ProcessExecuter::MonitoredPipe.new(data_collector)
pipe.to_io.write('Hello World')
pipe.close
data_collector.string #=> "Hello World"

Returns:

  • (IO)

    the write end of the pipe



189
190
191
# File 'lib/process_executer/monitored_pipe.rb', line 189

def to_io
  pipe_writer
end

#write(data) ⇒ Integer

Writes data to the pipe so that it can be read by the monitor thread

Primarily used for testing.

Examples:

require 'stringio'
data_collector = StringIO.new
pipe = ProcessExecuter::MonitoredPipe.new(data_collector)
pipe.write('Hello World')
pipe.close
data_collector.string #=> "Hello World"

Parameters:

  • data (String)

    the data to write to the pipe

Returns:

  • (Integer)

    the number of bytes written to the pipe

Raises:

  • (IOError)

    if the pipe is not open



227
228
229
230
231
232
233
# File 'lib/process_executer/monitored_pipe.rb', line 227

def write(data)
  mutex.synchronize do
    raise IOError, 'closed stream' unless state == :open

    pipe_writer.write(data)
  end
end