Class: Polyphony::Pipe
- Defined in:
- lib/polyphony/extensions/pipe.rb,
ext/polyphony/pipe.c
Overview
A Pipe instance represents a UNIX pipe that can be read and written to. This
API is an alternative to the IO.pipe
API, that returns two separate fds, one
for reading and one for writing. Instead, Polyphony::Pipe
encapsulates the
two fds in a single object, providing methods that enable us to treat the pipe
as a normal IO object.
Defined Under Namespace
Classes: ClosedPipeError
Instance Method Summary collapse
-
#<<(buf) ⇒ Integer
Bytes written.
-
#close ⇒ Pipe
Closes the pipe.
-
#closed? ⇒ boolean
Returns true if the pipe is closed.
-
#fds ⇒ Array<Integer>
Returns an array containing the read and write fds for the pipe, respectively.
-
#feed_loop(receiver, method = :call, &block) ⇒ Polyphony::Pipe
Receives data from the pipe in an infinite loop, passing the data to the given receiver using the given method.
-
#getbyte ⇒ Integer?
Reads a single byte from the pipe.
-
#getc ⇒ Object
Reads a single character from the pipe.
-
#gets(sep = $/, _limit = nil, _chomp: nil) ⇒ String?
Read line.
-
#initialize ⇒ Object
constructor
Creates a new pipe.
-
#puts(*args) ⇒ Object
Writes a line with line feed to the pipe.
-
#read(len = nil, buf = nil, buffer_pos = 0) ⇒ String
Reads from the pipe.
-
#read_loop(maxlen = 8192) {|String| ... } ⇒ Polyphony::Pipe
Runs a read loop.
-
#readpartial(len, buf = +'',, buffer_pos = 0, raise_on_eof = true) ⇒ String
Reads from the pipe.
-
#splice_from(src, maxlen) ⇒ Integer
Splices to the pipe from the given source.
-
#tee_from(src, maxlen) ⇒ Integer
Tees to the pipe from the given source.
-
#wait_readable(timeout = nil) ⇒ Polyphony::Pipe
Waits for pipe to become readable.
-
#wait_writable(timeout = nil) ⇒ Polyphony::Pipe
Waits for pipe to become writeable.
-
#write(buf, *args) ⇒ Integer
Bytes written.
Constructor Details
#initialize ⇒ Object
Creates a new pipe.
42 43 44 45 46 47 48 49 50 51 52 53 54 |
# File 'ext/polyphony/pipe.c', line 42
static VALUE Pipe_initialize(VALUE self) {
Pipe_t *pipe_struct;
GetPipe(self, pipe_struct);
int ret = pipe(pipe_struct->fds);
if (ret) {
int e = errno;
rb_syserr_fail(e, strerror(e));
}
pipe_struct->w_closed = 0;
return self;
}
|
Instance Method Details
#<<(buf) ⇒ Integer
Returns bytes written.
85 86 87 88 |
# File 'lib/polyphony/extensions/pipe.rb', line 85 def <<(buf) Polyphony.backend_write(self, buf) self end |
#close ⇒ Pipe
Closes the pipe.
94 95 96 97 98 99 100 101 102 103 |
# File 'ext/polyphony/pipe.c', line 94
VALUE Pipe_close(VALUE self) {
Pipe_t *pipe;
GetPipe(self, pipe);
if (pipe->w_closed)
rb_raise(rb_eRuntimeError, "Pipe is already closed for writing");
Backend_close(BACKEND(), INT2FIX(pipe->fds[1]));
pipe->w_closed = 1;
return self;
}
|
#closed? ⇒ boolean
Returns true if the pipe is closed.
83 84 85 86 87 |
# File 'ext/polyphony/pipe.c', line 83
VALUE Pipe_closed_p(VALUE self) {
Pipe_t *pipe;
GetPipe(self, pipe);
return pipe->w_closed ? Qtrue : Qfalse;
}
|
#fds ⇒ Array<Integer>
Returns an array containing the read and write fds for the pipe, respectively.
111 112 113 114 115 116 |
# File 'ext/polyphony/pipe.c', line 111
VALUE Pipe_fds(VALUE self) {
Pipe_t *pipe;
GetPipe(self, pipe);
return rb_ary_new_from_args(2, INT2FIX(pipe->fds[0]), INT2FIX(pipe->fds[1]));
}
|
#feed_loop(receiver, method = :call, &block) ⇒ Polyphony::Pipe
Receives data from the pipe in an infinite loop, passing the data to the given receiver using the given method. If a block is given, the result of the method call to the receiver is passed to the block.
This method can be used to feed data into parser objects. The following example shows how to feed data from a pipe directly into a MessagePack unpacker:
unpacker = MessagePack::Unpacker.new pipe.feed_loop(unpacker, :feed_each) { |msg| handle_msg(msg) }
198 199 200 |
# File 'lib/polyphony/extensions/pipe.rb', line 198 def feed_loop(receiver, method = :call, &block) Polyphony.backend_feed_loop(self, receiver, method, &block) end |
#getbyte ⇒ Integer?
Reads a single byte from the pipe.
22 23 24 25 |
# File 'lib/polyphony/extensions/pipe.rb', line 22 def getbyte char = getc char ? char.getbyte(0) : nil end |
#getc ⇒ Object
Reads a single character from the pipe.
30 31 32 33 34 35 36 37 38 |
# File 'lib/polyphony/extensions/pipe.rb', line 30 def getc return @read_buffer.slice!(0) if @read_buffer && !@read_buffer.empty? @read_buffer ||= +'' Polyphony.backend_read(self, @read_buffer, 8192, false, -1) return @read_buffer.slice!(0) if !@read_buffer.empty? nil end |
#gets(sep = $/, _limit = nil, _chomp: nil) ⇒ String?
Returns read line.
94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 |
# File 'lib/polyphony/extensions/pipe.rb', line 94 def gets(sep = $/, _limit = nil, _chomp: nil) if sep.is_a?(Integer) sep = $/ _limit = sep end sep_size = sep.bytesize @read_buffer ||= +'' while true idx = @read_buffer.index(sep) return @read_buffer.slice!(0, idx + sep_size) if idx result = readpartial(8192, @read_buffer, -1) return nil unless result end rescue EOFError return nil end |
#puts(*args) ⇒ Object
Writes a line with line feed to the pipe.
131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 |
# File 'lib/polyphony/extensions/pipe.rb', line 131 def puts(*args) if args.empty? write LINEFEED return end idx = 0 while idx < args.size arg = args[idx] args[idx] = arg = arg.to_s unless arg.is_a?(String) if arg =~ LINEFEED_RE idx += 1 else args.insert(idx + 1, LINEFEED) idx += 2 end end write(*args) nil end |
#read(len = nil, buf = nil, buffer_pos = 0) ⇒ String
Reads from the pipe.
46 47 48 49 50 51 52 53 54 55 56 |
# File 'lib/polyphony/extensions/pipe.rb', line 46 def read(len = nil, buf = nil, buffer_pos = 0) return Polyphony.backend_read(self, buf, len, true, buffer_pos) if buf @read_buffer ||= +'' result = Polyphony.backend_read(self, @read_buffer, len, true, -1) return nil unless result already_read = @read_buffer @read_buffer = +'' already_read end |
#read_loop(maxlen = 8192) {|String| ... } ⇒ Polyphony::Pipe
Runs a read loop.
180 181 182 |
# File 'lib/polyphony/extensions/pipe.rb', line 180 def read_loop(maxlen = 8192, &block) Polyphony.backend_read_loop(self, maxlen, &block) end |
#readpartial(len, buf = +'',, buffer_pos = 0, raise_on_eof = true) ⇒ String
Reads from the pipe.
65 66 67 68 69 70 |
# File 'lib/polyphony/extensions/pipe.rb', line 65 def readpartial(len, buf = +'', buffer_pos = 0, raise_on_eof = true) result = Polyphony.backend_read(self, buf, len, false, buffer_pos) raise EOFError if !result && raise_on_eof result end |
#splice_from(src, maxlen) ⇒ Integer
Splices to the pipe from the given source. If maxlen is negative, splices repeatedly using absolute value of maxlen until EOF is encountered.
240 241 242 |
# File 'lib/polyphony/extensions/pipe.rb', line 240 def splice_from(src, maxlen) Polyphony.backend_splice(src, self, maxlen) end |
#tee_from(src, maxlen) ⇒ Integer
Tees to the pipe from the given source.
250 251 252 |
# File 'lib/polyphony/extensions/pipe.rb', line 250 def tee_from(src, maxlen) Polyphony.backend_tee(src, self, maxlen) end |
#wait_readable(timeout = nil) ⇒ Polyphony::Pipe
Waits for pipe to become readable.
206 207 208 209 210 211 212 213 214 215 216 |
# File 'lib/polyphony/extensions/pipe.rb', line 206 def wait_readable(timeout = nil) if timeout move_on_after(timeout) do Polyphony.backend_wait_io(self, false) self end else Polyphony.backend_wait_io(self, false) self end end |
#wait_writable(timeout = nil) ⇒ Polyphony::Pipe
Waits for pipe to become writeable.
222 223 224 225 226 227 228 229 230 231 232 |
# File 'lib/polyphony/extensions/pipe.rb', line 222 def wait_writable(timeout = nil) if timeout move_on_after(timeout) do Polyphony.backend_wait_io(self, true) self end else Polyphony.backend_wait_io(self, true) self end end |
#write(buf, *args) ⇒ Integer
Returns bytes written.
77 78 79 |
# File 'lib/polyphony/extensions/pipe.rb', line 77 def write(buf, *args) Polyphony.backend_write(self, buf, *args) end |