Class: ProcessExecuter::MonitoredPipe
- Inherits:
-
Object
- Object
- ProcessExecuter::MonitoredPipe
- Defined in:
- lib/process_executer/monitored_pipe.rb
Overview
Write data sent through a pipe to a destination
When a new MonitoredPipe is created, a pipe is created (via IO.pipe) and a thread is created to read data written to the pipe.
If the destination raises an exception, the monitoring thread will exit, the
pipe will be closed, and the exception will be saved in #exception.
#close must be called to ensure that (1) the pipe is closed, (2) all data is
read from the pipe and written to the destination, and (3) the monitoring thread is
killed.
Instance Attribute Summary collapse
-
#chunk_size ⇒ Integer
readonly
The size of the chunks to read from the pipe.
-
#destination ⇒ Array<ProcessExecuter::Destination::Base>
readonly
The redirection destination to write data that is read from the pipe.
-
#exception ⇒ Exception?
readonly
The exception raised by a destination.
-
#fileno ⇒ Integer
readonly
private
The file descriptor for the write end of the pipe.
-
#pipe_reader ⇒ IO
readonly
The read end of the pipe.
-
#pipe_writer ⇒ IO
readonly
The write end of the pipe.
-
#state ⇒ Symbol
readonly
The state of the pipe.
-
#thread ⇒ Thread
readonly
The thread that monitors the pipe.
Instance Method Summary collapse
-
#close
Set the state to
:closingand wait for the state to be set to:closed. -
#initialize(redirection_destination, chunk_size: 100_000) ⇒ MonitoredPipe
constructor
Create a new monitored pipe.
-
#to_io ⇒ IO
private
Return the write end of the pipe so that data can be written to it.
-
#write(data) ⇒ Integer
private
Writes data to the pipe so that it can be read by the monitor thread.
Constructor Details
#initialize(redirection_destination, chunk_size: 100_000) ⇒ MonitoredPipe
Create a new monitored pipe
Creates a IO.pipe and starts a monitoring thread to read data written to the pipe.
56 57 58 59 60 61 62 63 64 65 66 67 68 69 |
# File 'lib/process_executer/monitored_pipe.rb', line 56 def initialize(redirection_destination, chunk_size: 100_000) @destination = Destinations.factory(redirection_destination) assert_destination_is_compatible_with_monitored_pipe @chunk_size = chunk_size @pipe_reader, @pipe_writer = IO.pipe @state = :open @thread = Thread.new do Thread.current.report_on_exception = false Thread.current.abort_on_exception = false monitor end end |
Instance Attribute Details
#chunk_size ⇒ Integer (readonly)
The size of the chunks to read from the pipe
172 173 174 |
# File 'lib/process_executer/monitored_pipe.rb', line 172 def chunk_size @chunk_size end |
#destination ⇒ Array<ProcessExecuter::Destination::Base> (readonly)
The redirection destination to write data that is read from the pipe
186 187 188 |
# File 'lib/process_executer/monitored_pipe.rb', line 186 def destination @destination end |
#exception ⇒ Exception? (readonly)
The exception raised by a destination
If an exception is raised by a destination, it is stored here. Otherwise, it is nil.
254 255 256 |
# File 'lib/process_executer/monitored_pipe.rb', line 254 def exception @exception end |
#fileno ⇒ Integer (readonly)
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
The file descriptor for the write end of the pipe
132 133 134 |
# File 'lib/process_executer/monitored_pipe.rb', line 132 def fileno pipe_writer.fileno end |
#pipe_reader ⇒ IO (readonly)
The read end of the pipe
210 211 212 |
# File 'lib/process_executer/monitored_pipe.rb', line 210 def pipe_reader @pipe_reader end |
#pipe_writer ⇒ IO (readonly)
The write end of the pipe
221 222 223 |
# File 'lib/process_executer/monitored_pipe.rb', line 221 def pipe_writer @pipe_writer end |
#state ⇒ Symbol (readonly)
The state of the pipe
Must be either :open, :closing, or :closed
:open- the pipe is open and data can be written to it:closing- the pipe is being closed and data can no longer be written to it:closed- the pipe is closed and data can no longer be written to it
241 242 243 |
# File 'lib/process_executer/monitored_pipe.rb', line 241 def state @state end |
#thread ⇒ Thread (readonly)
The thread that monitors the pipe
199 200 201 |
# File 'lib/process_executer/monitored_pipe.rb', line 199 def thread @thread end |
Instance Method Details
#close
This method returns an undefined value.
Set the state to :closing and wait for the state to be set to :closed
The monitoring thread will see that the state has changed and will close the pipe.
86 87 88 89 90 91 92 93 |
# File 'lib/process_executer/monitored_pipe.rb', line 86 def close return unless state == :open @state = :closing sleep 0.001 until state == :closed destination.close end |
#to_io ⇒ IO
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Return the write end of the pipe so that data can be written to it
Data written to this end of the pipe will be read by the monitor thread and written to the destination.
This is so we can provide a MonitoredPipe to Process.spawn as a FD
114 115 116 |
# File 'lib/process_executer/monitored_pipe.rb', line 114 def to_io pipe_writer end |
#write(data) ⇒ Integer
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Writes data to the pipe so that it can be read by the monitor thread
Primarily used for testing.
154 155 156 157 158 |
# File 'lib/process_executer/monitored_pipe.rb', line 154 def write(data) raise IOError, 'closed stream' unless state == :open pipe_writer.write(data) end |