Class: ProcessExecuter::MonitoredPipe

Inherits:
Object
  • Object
show all
Defined in:
lib/process_executer/monitored_pipe.rb

Overview

Stream data sent through a pipe to one or more writers

When a new MonitoredPipe is created, a pipe is created (via IO.pipe) and a thread is created to read data written to the pipe.

Data that is read from that pipe is written one or more writers passed to #initialize.

If any of the writers raise an exception, the monitoring thread will exit, the pipe will be closed, and the exception will be saved in #exception.

#close must be called to ensure that (1) the pipe is closed, (2) all data is read from the pipe and written to the writers, and (3) the monitoring thread is killed.

Examples:

Collect pipe data into a string

pipe_data = StringIO.new
begin
  pipe = MonitoredPipe.new(pipe_data)
  pipe.write("Hello World")
ensure
  pipe.close
end
pipe_data.string #=> "Hello World"

Collect pipe data into a string AND a file

pipe_data_string = StringIO.new
pipe_data_file = File.open("pipe_data.txt", "w")
begin
  pipe = MonitoredPipe.new(pipe_data_string, pipe_data_file)
  pipe.write("Hello World")
ensure
  pipe.close
end
pipe_data_string.string #=> "Hello World"
File.read("pipe_data.txt") #=> "Hello World"

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(*writers, chunk_size: 100_000) ⇒ MonitoredPipe

Create a new monitored pipe

Creates a IO.pipe and starts a monitoring thread to read data written to the pipe.

Examples:

data_collector = StringIO.new
pipe = ProcessExecuter::MonitoredPipe.new(data_collector)

Parameters:

  • writers (Array<#write>)

    as data is read from the pipe, it is written to these writers

  • chunk_size (Integer) (defaults to: 100_000)

    the size of the chunks to read from the pipe



58
59
60
61
62
63
64
65
66
67
68
# File 'lib/process_executer/monitored_pipe.rb', line 58

def initialize(*writers, chunk_size: 100_000)
  @writers = writers
  @chunk_size = chunk_size
  @pipe_reader, @pipe_writer = IO.pipe
  @state = :open
  @thread = Thread.new do
    Thread.current.report_on_exception = false
    Thread.current.abort_on_exception = false
    monitor
  end
end

Instance Attribute Details

#chunk_sizeInteger (readonly)

The size of the chunks to read from the pipe

Examples:

require 'stringio'
data_collector = StringIO.new
pipe = ProcessExecuter::MonitoredPipe.new(data_collector)
pipe.chunk_size #=> 1000

Returns:

  • (Integer)

    the size of the chunks to read from the pipe



169
170
171
# File 'lib/process_executer/monitored_pipe.rb', line 169

def chunk_size
  @chunk_size
end

#exceptionException? (readonly)

The exception raised by a writer

If an exception is raised by a writer, it is stored here. Otherwise, it is nil.

Examples:

pipe.exception #=> nil

Returns:

  • (Exception, nil)

    the exception raised by a writer or nil if no exception was raised



258
259
260
# File 'lib/process_executer/monitored_pipe.rb', line 258

def exception
  @exception
end

#filenoInteger (readonly)

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

The file descriptor for the write end of the pipe

Examples:

require 'stringio'
data_collector = StringIO.new
pipe = ProcessExecuter::MonitoredPipe.new(data_collector)
pipe.fileno == pipe.to_io.fileno #=> true

Returns:

  • (Integer)

    the file descriptor for the write end of the pipe



129
130
131
# File 'lib/process_executer/monitored_pipe.rb', line 129

def fileno
  pipe_writer.fileno
end

#pipe_readerIO (readonly)

The read end of the pipe

Examples:

pipe = ProcessExecuter::MonitoredPipe.new($stdout)
pipe.pipe_reader #=> #<IO:fd 11>

Returns:

  • (IO)


214
215
216
# File 'lib/process_executer/monitored_pipe.rb', line 214

def pipe_reader
  @pipe_reader
end

#pipe_writerIO (readonly)

The write end of the pipe

Examples:

pipe = ProcessExecuter::MonitoredPipe.new($stdout)
pipe.pipe_writer #=> #<IO:fd 12>

Returns:

  • (IO)

    the write end of the pipe



225
226
227
# File 'lib/process_executer/monitored_pipe.rb', line 225

def pipe_writer
  @pipe_writer
end

#stateSymbol (readonly)

The state of the pipe

Must be either :open, :closing, or :closed

  • :open - the pipe is open and data can be written to it
  • :closing - the pipe is being closed and data can no longer be written to it
  • :closed - the pipe is closed and data can no longer be written to it

Examples:

pipe = ProcessExecuter::MonitoredPipe.new($stdout)
pipe.state #=> :open
pipe.close
pipe.state #=> :closed

Returns:

  • (Symbol)

    the state of the pipe



245
246
247
# File 'lib/process_executer/monitored_pipe.rb', line 245

def state
  @state
end

#threadThread (readonly)

The thread that monitors the pipe

Examples:

require 'stringio'
data_collector = StringIO.new
pipe = ProcessExecuter::MonitoredPipe.new(data_collector)
pipe.thread #=> #<Thread:0x00007f8b1a0b0e00>

Returns:

  • (Thread)


203
204
205
# File 'lib/process_executer/monitored_pipe.rb', line 203

def thread
  @thread
end

#writersArray<#write> (readonly)

An array of writers to write data that is read from the pipe

Examples:

with one writer

require 'stringio'
data_collector = StringIO.new
pipe = ProcessExecuter::MonitoredPipe.new(data_collector)
pipe.writers #=> [data_collector]

with an array of writers

require 'stringio'
data_collector1 = StringIO.new
data_collector2 = StringIO.new
pipe = ProcessExecuter::MonitoredPipe.new(data_collector1, data_collector2)
pipe.writers #=> [data_collector1, data_collector2]]

Returns:



190
191
192
# File 'lib/process_executer/monitored_pipe.rb', line 190

def writers
  @writers
end

Instance Method Details

#close

This method returns an undefined value.

Set the state to :closing and wait for the state to be set to :closed

The monitoring thread will see that the state has changed and will close the pipe.

Examples:

data_collector = StringIO.new
pipe = ProcessExecuter::MonitoredPipe.new(data_collector)
pipe.state #=> :open
pipe.write('Hello World')
pipe.close
pipe.state #=> :closed
data_collector.string #=> "Hello World"


85
86
87
88
89
90
# File 'lib/process_executer/monitored_pipe.rb', line 85

def close
  return unless state == :open

  @state = :closing
  sleep 0.001 until state == :closed
end

#to_ioIO

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Return the write end of the pipe so that data can be written to it

Data written to this end of the pipe will be read by the monitor thread and written to the writers passed to #initialize.

This is so we can provide a MonitoredPipe to Process.spawn as a FD

Examples:

require 'stringio'
data_collector = StringIO.new
pipe = ProcessExecuter::MonitoredPipe.new(data_collector)
pipe.to_io.write('Hello World')
pipe.close
data_collector.string #=> "Hello World"

Returns:

  • (IO)

    the write end of the pipe



111
112
113
# File 'lib/process_executer/monitored_pipe.rb', line 111

def to_io
  pipe_writer
end

#write(data) ⇒ Integer

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Writes data to the pipe so that it can be read by the monitor thread

Primarily used for testing.

Examples:

require 'stringio'
data_collector = StringIO.new
pipe = ProcessExecuter::MonitoredPipe.new(data_collector)
pipe.write('Hello World')
pipe.close
data_collector.string #=> "Hello World"

Parameters:

  • data (String)

    the data to write to the pipe

Returns:

  • (Integer)

    the number of bytes written to the pipe

Raises:

  • (IOError)


151
152
153
154
155
# File 'lib/process_executer/monitored_pipe.rb', line 151

def write(data)
  raise IOError, 'closed stream' unless state == :open

  pipe_writer.write(data)
end