Class: ProcessExecuter::MonitoredPipe
- Inherits:
-
Object
- Object
- ProcessExecuter::MonitoredPipe
- Defined in:
- lib/process_executer/monitored_pipe.rb
Overview
Write data sent through a pipe to a destination
When a new MonitoredPipe is created, a pipe is created (via IO.pipe) and a thread is created to read data written to the pipe.
If the destination raises an exception, the monitoring thread will exit, the
pipe will be closed, and the exception will be saved in #exception.
#close must be called to ensure that (1) the pipe is closed, (2) all data is
read from the pipe and written to the destination, and (3) the monitoring thread is
killed.
Instance Attribute Summary collapse
-
#chunk_size ⇒ Integer
readonly
The size of the chunks to read from the pipe.
-
#destination ⇒ Array<ProcessExecuter::Destination::Base>
readonly
The redirection destination to write data that is read from the pipe.
-
#exception ⇒ Exception?
readonly
The exception raised by a destination.
-
#fileno ⇒ Integer
readonly
private
The file descriptor for the write end of the pipe.
-
#pipe_reader ⇒ IO
readonly
The read end of the pipe.
-
#pipe_writer ⇒ IO
readonly
The write end of the pipe.
-
#state ⇒ Symbol
readonly
The state of the pipe.
-
#thread ⇒ Thread
readonly
The thread that monitors the pipe.
Instance Method Summary collapse
-
#close
Set the state to
:closingand wait for the state to be set to:closed. -
#initialize(redirection_destination, chunk_size: 100_000) ⇒ MonitoredPipe
constructor
Create a new monitored pipe.
-
#to_io ⇒ IO
private
Return the write end of the pipe so that data can be written to it.
-
#write(data) ⇒ Integer
private
Writes data to the pipe so that it can be read by the monitor thread.
Constructor Details
#initialize(redirection_destination, chunk_size: 100_000) ⇒ MonitoredPipe
Create a new monitored pipe
Creates a IO.pipe and starts a monitoring thread to read data written to the pipe.
56 57 58 59 60 61 62 63 64 65 66 67 |
# File 'lib/process_executer/monitored_pipe.rb', line 56 def initialize(redirection_destination, chunk_size: 100_000) @destination = Destinations.factory(redirection_destination) assert_destination_is_compatible_with_monitored_pipe @mutex = Mutex.new @condition_variable = ConditionVariable.new @chunk_size = chunk_size @pipe_reader, @pipe_writer = IO.pipe @state = :open @thread = start_monitoring_thread end |
Instance Attribute Details
#chunk_size ⇒ Integer (readonly)
The size of the chunks to read from the pipe
179 180 181 |
# File 'lib/process_executer/monitored_pipe.rb', line 179 def chunk_size @chunk_size end |
#destination ⇒ Array<ProcessExecuter::Destination::Base> (readonly)
The redirection destination to write data that is read from the pipe
193 194 195 |
# File 'lib/process_executer/monitored_pipe.rb', line 193 def destination @destination end |
#exception ⇒ Exception? (readonly)
The exception raised by a destination
If an exception is raised by a destination, it is stored here. Otherwise, it is nil.
261 262 263 |
# File 'lib/process_executer/monitored_pipe.rb', line 261 def exception @exception end |
#fileno ⇒ Integer (readonly)
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
The file descriptor for the write end of the pipe
137 138 139 |
# File 'lib/process_executer/monitored_pipe.rb', line 137 def fileno pipe_writer.fileno end |
#pipe_reader ⇒ IO (readonly)
The read end of the pipe
217 218 219 |
# File 'lib/process_executer/monitored_pipe.rb', line 217 def pipe_reader @pipe_reader end |
#pipe_writer ⇒ IO (readonly)
The write end of the pipe
228 229 230 |
# File 'lib/process_executer/monitored_pipe.rb', line 228 def pipe_writer @pipe_writer end |
#state ⇒ Symbol (readonly)
The state of the pipe
Must be either :open, :closing, or :closed
:open- the pipe is open and data can be written to it:closing- the pipe is being closed and data can no longer be written to it:closed- the pipe is closed and data can no longer be written to it
248 249 250 |
# File 'lib/process_executer/monitored_pipe.rb', line 248 def state @state end |
#thread ⇒ Thread (readonly)
The thread that monitors the pipe
206 207 208 |
# File 'lib/process_executer/monitored_pipe.rb', line 206 def thread @thread end |
Instance Method Details
#close
This method returns an undefined value.
Set the state to :closing and wait for the state to be set to :closed
The monitoring thread will see that the state has changed and will close the pipe.
84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 |
# File 'lib/process_executer/monitored_pipe.rb', line 84 def close mutex.synchronize do return unless state == :open @state = :closing end mutex.synchronize do condition_variable.wait(mutex) while @state != :closed end thread.join destination.close end |
#to_io ⇒ IO
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Return the write end of the pipe so that data can be written to it
Data written to this end of the pipe will be read by the monitor thread and written to the destination.
This is so we can provide a MonitoredPipe to Process.spawn as a FD
119 120 121 |
# File 'lib/process_executer/monitored_pipe.rb', line 119 def to_io pipe_writer end |
#write(data) ⇒ Integer
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Writes data to the pipe so that it can be read by the monitor thread
Primarily used for testing.
159 160 161 162 163 164 165 |
# File 'lib/process_executer/monitored_pipe.rb', line 159 def write(data) mutex.synchronize do raise IOError, 'closed stream' unless state == :open pipe_writer.write(data) end end |