Class: ProcessExecuter::MonitoredPipe

Inherits:
Object
  • Object
show all
Defined in:
lib/process_executer/monitored_pipe.rb

Overview

Write data sent through a pipe to a destination

When a new MonitoredPipe is created, a pipe is created (via IO.pipe) and a thread is created to read data written to the pipe.

If the destination raises an exception, the monitoring thread will exit, the pipe will be closed, and the exception will be saved in #exception.

#close must be called to ensure that (1) the pipe is closed, (2) all data is read from the pipe and written to the destination, and (3) the monitoring thread is killed.

Examples:

Collect pipe data into a string

pipe_data = StringIO.new
begin
  pipe = MonitoredPipe.new(pipe_data)
  pipe.write("Hello World")
ensure
  pipe.close
end
pipe_data.string #=> "Hello World"

Collect pipe data into a string AND a file

pipe_data_string = StringIO.new
pipe_data_file = File.open("pipe_data.txt", "w")
begin
  pipe = MonitoredPipe.new(pipe_data_string, pipe_data_file)
  pipe.write("Hello World")
ensure
  pipe.close
end
pipe_data_string.string #=> "Hello World"
File.read("pipe_data.txt") #=> "Hello World"

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(redirection_destination, chunk_size: 100_000) ⇒ MonitoredPipe

Create a new monitored pipe

Creates a IO.pipe and starts a monitoring thread to read data written to the pipe.

Examples:

data_collector = StringIO.new
pipe = ProcessExecuter::MonitoredPipe.new(data_collector)

Parameters:

  • redirection_destination (Array<#write>)

    as data is read from the pipe, it is written to this destination

  • chunk_size (Integer) (defaults to: 100_000)

    the size of the chunks to read from the pipe



56
57
58
59
60
61
62
63
64
65
66
67
# File 'lib/process_executer/monitored_pipe.rb', line 56

def initialize(redirection_destination, chunk_size: 100_000)
  @destination = Destinations.factory(redirection_destination)

  assert_destination_is_compatible_with_monitored_pipe

  @mutex = Mutex.new
  @condition_variable = ConditionVariable.new
  @chunk_size = chunk_size
  @pipe_reader, @pipe_writer = IO.pipe
  @state = :open
  @thread = start_monitoring_thread
end

Instance Attribute Details

#chunk_sizeInteger (readonly)

The size of the chunks to read from the pipe

Examples:

require 'stringio'
data_collector = StringIO.new
pipe = ProcessExecuter::MonitoredPipe.new(data_collector)
pipe.chunk_size #=> 1000

Returns:

  • (Integer)

    the size of the chunks to read from the pipe



179
180
181
# File 'lib/process_executer/monitored_pipe.rb', line 179

def chunk_size
  @chunk_size
end

#destinationArray<ProcessExecuter::Destination::Base> (readonly)

The redirection destination to write data that is read from the pipe

Examples:

require 'stringio'
data_collector = StringIO.new
pipe = ProcessExecuter::MonitoredPipe.new(data_collector)
pipe.destination #=>

Returns:

  • (Array<ProcessExecuter::Destination::Base>)


193
194
195
# File 'lib/process_executer/monitored_pipe.rb', line 193

def destination
  @destination
end

#exceptionException? (readonly)

The exception raised by a destination

If an exception is raised by a destination, it is stored here. Otherwise, it is nil.

Examples:

pipe.exception #=> nil

Returns:

  • (Exception, nil)

    the exception raised by a destination or nil if no exception was raised



261
262
263
# File 'lib/process_executer/monitored_pipe.rb', line 261

def exception
  @exception
end

#filenoInteger (readonly)

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

The file descriptor for the write end of the pipe

Examples:

require 'stringio'
data_collector = StringIO.new
pipe = ProcessExecuter::MonitoredPipe.new(data_collector)
pipe.fileno == pipe.to_io.fileno #=> true

Returns:

  • (Integer)

    the file descriptor for the write end of the pipe



137
138
139
# File 'lib/process_executer/monitored_pipe.rb', line 137

def fileno
  pipe_writer.fileno
end

#pipe_readerIO (readonly)

The read end of the pipe

Examples:

pipe = ProcessExecuter::MonitoredPipe.new($stdout)
pipe.pipe_reader #=> #<IO:fd 11>

Returns:

  • (IO)


217
218
219
# File 'lib/process_executer/monitored_pipe.rb', line 217

def pipe_reader
  @pipe_reader
end

#pipe_writerIO (readonly)

The write end of the pipe

Examples:

pipe = ProcessExecuter::MonitoredPipe.new($stdout)
pipe.pipe_writer #=> #<IO:fd 12>

Returns:

  • (IO)

    the write end of the pipe



228
229
230
# File 'lib/process_executer/monitored_pipe.rb', line 228

def pipe_writer
  @pipe_writer
end

#stateSymbol (readonly)

The state of the pipe

Must be either :open, :closing, or :closed

  • :open - the pipe is open and data can be written to it
  • :closing - the pipe is being closed and data can no longer be written to it
  • :closed - the pipe is closed and data can no longer be written to it

Examples:

pipe = ProcessExecuter::MonitoredPipe.new($stdout)
pipe.state #=> :open
pipe.close
pipe.state #=> :closed

Returns:

  • (Symbol)

    the state of the pipe



248
249
250
# File 'lib/process_executer/monitored_pipe.rb', line 248

def state
  @state
end

#threadThread (readonly)

The thread that monitors the pipe

Examples:

require 'stringio'
data_collector = StringIO.new
pipe = ProcessExecuter::MonitoredPipe.new(data_collector)
pipe.thread #=> #<Thread:0x00007f8b1a0b0e00>

Returns:

  • (Thread)


206
207
208
# File 'lib/process_executer/monitored_pipe.rb', line 206

def thread
  @thread
end

Instance Method Details

#close

This method returns an undefined value.

Set the state to :closing and wait for the state to be set to :closed

The monitoring thread will see that the state has changed and will close the pipe.

Examples:

data_collector = StringIO.new
pipe = ProcessExecuter::MonitoredPipe.new(data_collector)
pipe.state #=> :open
pipe.write('Hello World')
pipe.close
pipe.state #=> :closed
data_collector.string #=> "Hello World"


84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
# File 'lib/process_executer/monitored_pipe.rb', line 84

def close
  mutex.synchronize do
    return unless state == :open

    @state = :closing
  end

  mutex.synchronize do
    condition_variable.wait(mutex) while @state != :closed
  end

  thread.join

  destination.close
end

#to_ioIO

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Return the write end of the pipe so that data can be written to it

Data written to this end of the pipe will be read by the monitor thread and written to the destination.

This is so we can provide a MonitoredPipe to Process.spawn as a FD

Examples:

require 'stringio'
data_collector = StringIO.new
pipe = ProcessExecuter::MonitoredPipe.new(data_collector)
pipe.to_io.write('Hello World')
pipe.close
data_collector.string #=> "Hello World"

Returns:

  • (IO)

    the write end of the pipe



119
120
121
# File 'lib/process_executer/monitored_pipe.rb', line 119

def to_io
  pipe_writer
end

#write(data) ⇒ Integer

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Writes data to the pipe so that it can be read by the monitor thread

Primarily used for testing.

Examples:

require 'stringio'
data_collector = StringIO.new
pipe = ProcessExecuter::MonitoredPipe.new(data_collector)
pipe.write('Hello World')
pipe.close
data_collector.string #=> "Hello World"

Parameters:

  • data (String)

    the data to write to the pipe

Returns:

  • (Integer)

    the number of bytes written to the pipe



159
160
161
162
163
164
165
# File 'lib/process_executer/monitored_pipe.rb', line 159

def write(data)
  mutex.synchronize do
    raise IOError, 'closed stream' unless state == :open

    pipe_writer.write(data)
  end
end