Class: ProcessExecuter::MonitoredPipe
- Inherits:
-
Object
- Object
- ProcessExecuter::MonitoredPipe
- Includes:
- TrackOpenInstances
- Defined in:
- lib/process_executer/monitored_pipe.rb
Overview
Acts as a pipe that writes the data written to it to one or more destinations
MonitoredPipe was created to expand the output redirection
options for
Process.spawn
and methods derived from it within the ProcessExecuter
module.
This class's initializer accepts any redirection destination supported by
Process.spawn
(this is the value
part of the file redirection option described in the File
Redirection section of
Process.spawn
.
In addition to the standard redirection destinations, MonitoredPipe also supports these additional types of destinations:
- Arbitrary Writers
You can redirect subprocess output to any Ruby object that implements the
#write
method. This is particularly useful for:
- capturing command output in in-memory buffers like `StringIO`
- sending command output to custom logging objects that do not have a file
descriptor
- processing with a streaming parser to parse and process command output as
the command is running
- Multiple Destinations
MonitoredPipe supports duplicating (or "teeing") output to multiple
destinations simultaneously. This is achieved by providing a redirection
destination in the form [:tee, destination1, destination2, ...]
, where each
destination
can be any value that MonitoredPipe
itself supports (including
another tee or MonitoredPipe).
When a new MonitoredPipe is created, a pipe is created (via IO.pipe) and a thread is created to read data written to the pipe. As data is read from the pipe, it is written to the destination provided in the MonitoredPipe initializer.
If the destination raises an exception, the monitoring thread will exit, the
pipe will be closed, and the exception will be saved in #exception
.
⚠️ WARNING
#close
must be called to ensure that (1) the pipe is closed, (2) all data is read from the pipe and written to the destination, and (3) the monitoring thread is killed.
Instance Attribute Summary collapse
-
#chunk_size ⇒ Integer
readonly
The size of the chunks to read from the pipe.
-
#destination ⇒ ProcessExecuter::Destinations::DestinationBase
readonly
The redirection destination to write data that is read from the pipe.
-
#exception ⇒ Exception?
readonly
The exception raised by a destination.
-
#fileno ⇒ Integer
readonly
The file descriptor for the write end of the pipe.
-
#pipe_reader ⇒ IO
readonly
private
The read end of the pipe.
-
#pipe_writer ⇒ IO
readonly
private
The write end of the pipe.
-
#state ⇒ Symbol
readonly
The state of the pipe.
-
#thread ⇒ Thread
readonly
private
The thread that monitors the pipe.
Instance Method Summary collapse
-
#close
Set the state to
:closing
and wait for the state to be set to:closed
. -
#initialize(redirection_destination, chunk_size: 100_000) ⇒ MonitoredPipe
constructor
Create a new monitored pipe.
-
#to_io ⇒ IO
Return the write end of the pipe so that data can be written to it.
-
#write(data) ⇒ Integer
Writes data to the pipe so that it can be read by the monitor thread.
Constructor Details
#initialize(redirection_destination, chunk_size: 100_000) ⇒ MonitoredPipe
Create a new monitored pipe
Creates an IO.pipe and starts a monitoring thread to read data written to the pipe.
122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 |
# File 'lib/process_executer/monitored_pipe.rb', line 122 def initialize(redirection_destination, chunk_size: 100_000) @destination = Destinations.factory(redirection_destination) assert_destination_is_compatible_with_monitored_pipe @mutex = Mutex.new @condition_variable = ConditionVariable.new @chunk_size = chunk_size @pipe_reader, @pipe_writer = IO.pipe # Set the encoding of the pipe reader to ASCII_8BIT. This is not strictly # necessary since read_nonblock always returns a String where encoding is # Encoding::ASCII_8BIT, but it is a good practice to explicitly set the # encoding. pipe_reader.set_encoding(Encoding::ASCII_8BIT) @state = :open @thread = start_monitoring_thread self.class.add_open_instance(self) end |
Instance Attribute Details
#chunk_size ⇒ Integer (readonly)
The size of the chunks to read from the pipe
247 248 249 |
# File 'lib/process_executer/monitored_pipe.rb', line 247 def chunk_size @chunk_size end |
#destination ⇒ ProcessExecuter::Destinations::DestinationBase (readonly)
The redirection destination to write data that is read from the pipe
261 262 263 |
# File 'lib/process_executer/monitored_pipe.rb', line 261 def destination @destination end |
#exception ⇒ Exception? (readonly)
The exception raised by a destination
If an exception is raised by a destination, it is stored here. Otherwise, it is nil
.
294 295 296 |
# File 'lib/process_executer/monitored_pipe.rb', line 294 def exception @exception end |
#fileno ⇒ Integer (readonly)
The file descriptor for the write end of the pipe
205 206 207 |
# File 'lib/process_executer/monitored_pipe.rb', line 205 def fileno pipe_writer.fileno end |
#pipe_reader ⇒ IO (readonly)
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
The read end of the pipe
324 325 326 |
# File 'lib/process_executer/monitored_pipe.rb', line 324 def pipe_reader @pipe_reader end |
#pipe_writer ⇒ IO (readonly)
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
The write end of the pipe
338 339 340 |
# File 'lib/process_executer/monitored_pipe.rb', line 338 def pipe_writer @pipe_writer end |
#state ⇒ Symbol (readonly)
The state of the pipe
Must be either :open
, :closing
, or :closed
:open
- the pipe is open and data can be written to it:closing
- the pipe is being closed and data can no longer be written to it:closed
- the pipe is closed and data can no longer be written to it
281 282 283 |
# File 'lib/process_executer/monitored_pipe.rb', line 281 def state @state end |
#thread ⇒ Thread (readonly)
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
The thread that monitors the pipe
310 311 312 |
# File 'lib/process_executer/monitored_pipe.rb', line 310 def thread @thread end |
Instance Method Details
#close
This method returns an undefined value.
Set the state to :closing
and wait for the state to be set to :closed
The monitoring thread will see that the state has changed and will close the pipe.
159 160 161 162 163 164 165 166 167 168 169 170 |
# File 'lib/process_executer/monitored_pipe.rb', line 159 def close mutex.synchronize do if state == :open @state = :closing condition_variable.wait(mutex) while @state != :closed end end thread.join destination.close self.class.remove_open_instance(self) end |
#to_io ⇒ IO
Return the write end of the pipe so that data can be written to it
Data written to this end of the pipe will be read by the monitor thread and written to the destination.
This is so we can provide a MonitoredPipe to Process.spawn as a FD
189 190 191 |
# File 'lib/process_executer/monitored_pipe.rb', line 189 def to_io pipe_writer end |
#write(data) ⇒ Integer
Writes data to the pipe so that it can be read by the monitor thread
Primarily used for testing.
227 228 229 230 231 232 233 |
# File 'lib/process_executer/monitored_pipe.rb', line 227 def write(data) mutex.synchronize do raise IOError, 'closed stream' unless state == :open pipe_writer.write(data) end end |