Class: ProcessExecuter::MonitoredPipe
- Inherits:
-
Object
- Object
- ProcessExecuter::MonitoredPipe
- Defined in:
- lib/process_executer/monitored_pipe.rb
Overview
Stream data sent through a pipe to one or more writers
When a new MonitoredPipe is created, a pipe is created (via IO.pipe) and a thread is created to read data written to the pipe.
Data that is read from that pipe is written one or more writers passed to
#initialize
.
If any of the writers raise an exception, the monitoring thread will exit, the
pipe will be closed, and the exception will be saved in #exception
.
#close
must be called to ensure that (1) the pipe is closed, (2) all data is
read from the pipe and written to the writers, and (3) the monitoring thread is
killed.
Instance Attribute Summary collapse
-
#chunk_size ⇒ Integer
readonly
The size of the chunks to read from the pipe.
-
#exception ⇒ Exception?
readonly
The exception raised by a writer.
-
#fileno ⇒ Integer
readonly
private
The file descriptor for the write end of the pipe.
-
#pipe_reader ⇒ IO
readonly
The read end of the pipe.
-
#pipe_writer ⇒ IO
readonly
The write end of the pipe.
-
#state ⇒ Symbol
readonly
The state of the pipe.
-
#thread ⇒ Thread
readonly
The thread that monitors the pipe.
-
#writers ⇒ Array<#write>
readonly
An array of writers to write data that is read from the pipe.
Instance Method Summary collapse
-
#close
Set the state to
:closing
and wait for the state to be set to:closed
. -
#initialize(*writers, chunk_size: 100_000) ⇒ MonitoredPipe
constructor
Create a new monitored pipe.
-
#to_io ⇒ IO
private
Return the write end of the pipe so that data can be written to it.
-
#write(data) ⇒ Integer
private
Writes data to the pipe so that it can be read by the monitor thread.
Constructor Details
#initialize(*writers, chunk_size: 100_000) ⇒ MonitoredPipe
Create a new monitored pipe
Creates a IO.pipe and starts a monitoring thread to read data written to the pipe.
58 59 60 61 62 63 64 65 66 67 68 |
# File 'lib/process_executer/monitored_pipe.rb', line 58 def initialize(*writers, chunk_size: 100_000) @writers = writers @chunk_size = chunk_size @pipe_reader, @pipe_writer = IO.pipe @state = :open @thread = Thread.new do Thread.current.report_on_exception = false Thread.current.abort_on_exception = false monitor end end |
Instance Attribute Details
#chunk_size ⇒ Integer (readonly)
The size of the chunks to read from the pipe
169 170 171 |
# File 'lib/process_executer/monitored_pipe.rb', line 169 def chunk_size @chunk_size end |
#exception ⇒ Exception? (readonly)
The exception raised by a writer
If an exception is raised by a writer, it is stored here. Otherwise, it is nil
.
258 259 260 |
# File 'lib/process_executer/monitored_pipe.rb', line 258 def exception @exception end |
#fileno ⇒ Integer (readonly)
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
The file descriptor for the write end of the pipe
129 130 131 |
# File 'lib/process_executer/monitored_pipe.rb', line 129 def fileno pipe_writer.fileno end |
#pipe_reader ⇒ IO (readonly)
The read end of the pipe
214 215 216 |
# File 'lib/process_executer/monitored_pipe.rb', line 214 def pipe_reader @pipe_reader end |
#pipe_writer ⇒ IO (readonly)
The write end of the pipe
225 226 227 |
# File 'lib/process_executer/monitored_pipe.rb', line 225 def pipe_writer @pipe_writer end |
#state ⇒ Symbol (readonly)
The state of the pipe
Must be either :open
, :closing
, or :closed
:open
- the pipe is open and data can be written to it:closing
- the pipe is being closed and data can no longer be written to it:closed
- the pipe is closed and data can no longer be written to it
245 246 247 |
# File 'lib/process_executer/monitored_pipe.rb', line 245 def state @state end |
#thread ⇒ Thread (readonly)
The thread that monitors the pipe
203 204 205 |
# File 'lib/process_executer/monitored_pipe.rb', line 203 def thread @thread end |
#writers ⇒ Array<#write> (readonly)
An array of writers to write data that is read from the pipe
190 191 192 |
# File 'lib/process_executer/monitored_pipe.rb', line 190 def writers @writers end |
Instance Method Details
#close
This method returns an undefined value.
Set the state to :closing
and wait for the state to be set to :closed
The monitoring thread will see that the state has changed and will close the pipe.
85 86 87 88 89 90 |
# File 'lib/process_executer/monitored_pipe.rb', line 85 def close return unless state == :open @state = :closing sleep 0.001 until state == :closed end |
#to_io ⇒ IO
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Return the write end of the pipe so that data can be written to it
Data written to this end of the pipe will be read by the monitor thread and
written to the writers passed to #initialize
.
This is so we can provide a MonitoredPipe to Process.spawn as a FD
111 112 113 |
# File 'lib/process_executer/monitored_pipe.rb', line 111 def to_io pipe_writer end |
#write(data) ⇒ Integer
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Writes data to the pipe so that it can be read by the monitor thread
Primarily used for testing.
151 152 153 154 155 |
# File 'lib/process_executer/monitored_pipe.rb', line 151 def write(data) raise IOError, 'closed stream' unless state == :open pipe_writer.write(data) end |