Class: ProcessExecuter::MonitoredPipe

Inherits:
Object
  • Object
show all
Defined in:
lib/process_executer/monitored_pipe.rb

Overview

Write data sent through a pipe to a destination

When a new MonitoredPipe is created, a pipe is created (via IO.pipe) and a thread is created to read data written to the pipe.

If the destination raises an exception, the monitoring thread will exit, the pipe will be closed, and the exception will be saved in #exception.

#close must be called to ensure that (1) the pipe is closed, (2) all data is read from the pipe and written to the destination, and (3) the monitoring thread is killed.

Examples:

Collect pipe data into a string

pipe_data = StringIO.new
begin
  pipe = MonitoredPipe.new(pipe_data)
  pipe.write("Hello World")
ensure
  pipe.close
end
pipe_data.string #=> "Hello World"

Collect pipe data into a string AND a file

pipe_data_string = StringIO.new
pipe_data_file = File.open("pipe_data.txt", "w")
begin
  pipe = MonitoredPipe.new(pipe_data_string, pipe_data_file)
  pipe.write("Hello World")
ensure
  pipe.close
end
pipe_data_string.string #=> "Hello World"
File.read("pipe_data.txt") #=> "Hello World"

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(redirection_destination, chunk_size: 100_000) ⇒ MonitoredPipe

Create a new monitored pipe

Creates a IO.pipe and starts a monitoring thread to read data written to the pipe.

Examples:

data_collector = StringIO.new
pipe = ProcessExecuter::MonitoredPipe.new(data_collector)

Parameters:

  • redirection_destination (Array<#write>)

    as data is read from the pipe, it is written to this destination

  • chunk_size (Integer) (defaults to: 100_000)

    the size of the chunks to read from the pipe



56
57
58
59
60
61
62
63
64
65
66
67
68
69
# File 'lib/process_executer/monitored_pipe.rb', line 56

def initialize(redirection_destination, chunk_size: 100_000)
  @destination = Destinations.factory(redirection_destination)

  assert_destination_is_compatible_with_monitored_pipe

  @chunk_size = chunk_size
  @pipe_reader, @pipe_writer = IO.pipe
  @state = :open
  @thread = Thread.new do
    Thread.current.report_on_exception = false
    Thread.current.abort_on_exception = false
    monitor
  end
end

Instance Attribute Details

#chunk_sizeInteger (readonly)

The size of the chunks to read from the pipe

Examples:

require 'stringio'
data_collector = StringIO.new
pipe = ProcessExecuter::MonitoredPipe.new(data_collector)
pipe.chunk_size #=> 1000

Returns:

  • (Integer)

    the size of the chunks to read from the pipe



172
173
174
# File 'lib/process_executer/monitored_pipe.rb', line 172

def chunk_size
  @chunk_size
end

#destinationArray<ProcessExecuter::Destination::Base> (readonly)

The redirection destination to write data that is read from the pipe

Examples:

require 'stringio'
data_collector = StringIO.new
pipe = ProcessExecuter::MonitoredPipe.new(data_collector)
pipe.destination #=>

Returns:

  • (Array<ProcessExecuter::Destination::Base>)


186
187
188
# File 'lib/process_executer/monitored_pipe.rb', line 186

def destination
  @destination
end

#exceptionException? (readonly)

The exception raised by a destination

If an exception is raised by a destination, it is stored here. Otherwise, it is nil.

Examples:

pipe.exception #=> nil

Returns:

  • (Exception, nil)

    the exception raised by a destination or nil if no exception was raised



254
255
256
# File 'lib/process_executer/monitored_pipe.rb', line 254

def exception
  @exception
end

#filenoInteger (readonly)

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

The file descriptor for the write end of the pipe

Examples:

require 'stringio'
data_collector = StringIO.new
pipe = ProcessExecuter::MonitoredPipe.new(data_collector)
pipe.fileno == pipe.to_io.fileno #=> true

Returns:

  • (Integer)

    the file descriptor for the write end of the pipe



132
133
134
# File 'lib/process_executer/monitored_pipe.rb', line 132

def fileno
  pipe_writer.fileno
end

#pipe_readerIO (readonly)

The read end of the pipe

Examples:

pipe = ProcessExecuter::MonitoredPipe.new($stdout)
pipe.pipe_reader #=> #<IO:fd 11>

Returns:

  • (IO)


210
211
212
# File 'lib/process_executer/monitored_pipe.rb', line 210

def pipe_reader
  @pipe_reader
end

#pipe_writerIO (readonly)

The write end of the pipe

Examples:

pipe = ProcessExecuter::MonitoredPipe.new($stdout)
pipe.pipe_writer #=> #<IO:fd 12>

Returns:

  • (IO)

    the write end of the pipe



221
222
223
# File 'lib/process_executer/monitored_pipe.rb', line 221

def pipe_writer
  @pipe_writer
end

#stateSymbol (readonly)

The state of the pipe

Must be either :open, :closing, or :closed

  • :open - the pipe is open and data can be written to it
  • :closing - the pipe is being closed and data can no longer be written to it
  • :closed - the pipe is closed and data can no longer be written to it

Examples:

pipe = ProcessExecuter::MonitoredPipe.new($stdout)
pipe.state #=> :open
pipe.close
pipe.state #=> :closed

Returns:

  • (Symbol)

    the state of the pipe



241
242
243
# File 'lib/process_executer/monitored_pipe.rb', line 241

def state
  @state
end

#threadThread (readonly)

The thread that monitors the pipe

Examples:

require 'stringio'
data_collector = StringIO.new
pipe = ProcessExecuter::MonitoredPipe.new(data_collector)
pipe.thread #=> #<Thread:0x00007f8b1a0b0e00>

Returns:

  • (Thread)


199
200
201
# File 'lib/process_executer/monitored_pipe.rb', line 199

def thread
  @thread
end

Instance Method Details

#close

This method returns an undefined value.

Set the state to :closing and wait for the state to be set to :closed

The monitoring thread will see that the state has changed and will close the pipe.

Examples:

data_collector = StringIO.new
pipe = ProcessExecuter::MonitoredPipe.new(data_collector)
pipe.state #=> :open
pipe.write('Hello World')
pipe.close
pipe.state #=> :closed
data_collector.string #=> "Hello World"


86
87
88
89
90
91
92
93
# File 'lib/process_executer/monitored_pipe.rb', line 86

def close
  return unless state == :open

  @state = :closing
  sleep 0.001 until state == :closed

  destination.close
end

#to_ioIO

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Return the write end of the pipe so that data can be written to it

Data written to this end of the pipe will be read by the monitor thread and written to the destination.

This is so we can provide a MonitoredPipe to Process.spawn as a FD

Examples:

require 'stringio'
data_collector = StringIO.new
pipe = ProcessExecuter::MonitoredPipe.new(data_collector)
pipe.to_io.write('Hello World')
pipe.close
data_collector.string #=> "Hello World"

Returns:

  • (IO)

    the write end of the pipe



114
115
116
# File 'lib/process_executer/monitored_pipe.rb', line 114

def to_io
  pipe_writer
end

#write(data) ⇒ Integer

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Writes data to the pipe so that it can be read by the monitor thread

Primarily used for testing.

Examples:

require 'stringio'
data_collector = StringIO.new
pipe = ProcessExecuter::MonitoredPipe.new(data_collector)
pipe.write('Hello World')
pipe.close
data_collector.string #=> "Hello World"

Parameters:

  • data (String)

    the data to write to the pipe

Returns:

  • (Integer)

    the number of bytes written to the pipe

Raises:

  • (IOError)


154
155
156
157
158
# File 'lib/process_executer/monitored_pipe.rb', line 154

def write(data)
  raise IOError, 'closed stream' unless state == :open

  pipe_writer.write(data)
end