Class: Polyphony::Pipe

Inherits:
Object show all
Defined in:
lib/polyphony/extensions/pipe.rb,
ext/polyphony/pipe.c

Overview

A Pipe instance represents a UNIX pipe that can be read and written to. This API is an alternative to the IO.pipe API, that returns two separate fds, one for reading and one for writing. Instead, Polyphony::Pipe encapsulates the two fds in a single object, providing methods that enable us to treat the pipe as a normal IO object.

Defined Under Namespace

Classes: ClosedPipeError

Instance Method Summary collapse

Constructor Details

#initializeObject

Creates a new pipe.



42
43
44
45
46
47
48
49
50
51
52
53
54
# File 'ext/polyphony/pipe.c', line 42

static VALUE Pipe_initialize(VALUE self) {
  Pipe_t *pipe_struct;
  GetPipe(self, pipe_struct);

  int ret = pipe(pipe_struct->fds);
  if (ret) {
    int e = errno;
    rb_syserr_fail(e, strerror(e));
  }
  pipe_struct->w_closed = 0;

  return self;
}

Instance Method Details

#<<(buf) ⇒ Integer

Returns bytes written.

Parameters:

  • buf (String)

    data to write

Returns:

  • (Integer)

    bytes written



85
86
87
88
# File 'lib/polyphony/extensions/pipe.rb', line 85

def <<(buf)
  Polyphony.backend_write(self, buf)
  self
end

#closePipe

Closes the pipe.

Returns:



94
95
96
97
98
99
100
101
102
103
# File 'ext/polyphony/pipe.c', line 94

VALUE Pipe_close(VALUE self) {
  Pipe_t *pipe;
  GetPipe(self, pipe);
  if (pipe->w_closed)
    rb_raise(rb_eRuntimeError, "Pipe is already closed for writing");

  Backend_close(BACKEND(), INT2FIX(pipe->fds[1]));
  pipe->w_closed = 1;
  return self;
}

#closed?boolean

Returns true if the pipe is closed.

Returns:

  • (boolean)


83
84
85
86
87
# File 'ext/polyphony/pipe.c', line 83

VALUE Pipe_closed_p(VALUE self) {
  Pipe_t *pipe;
  GetPipe(self, pipe);
  return pipe->w_closed ? Qtrue : Qfalse;
}

#fdsArray<Integer>

Returns an array containing the read and write fds for the pipe, respectively.

Returns:

  • (Array<Integer>)


111
112
113
114
115
116
# File 'ext/polyphony/pipe.c', line 111

VALUE Pipe_fds(VALUE self) {
  Pipe_t *pipe;
  GetPipe(self, pipe);

  return rb_ary_new_from_args(2, INT2FIX(pipe->fds[0]), INT2FIX(pipe->fds[1]));
}

#feed_loop(receiver, method = :call, &block) ⇒ Polyphony::Pipe

Receives data from the pipe in an infinite loop, passing the data to the given receiver using the given method. If a block is given, the result of the method call to the receiver is passed to the block.

This method can be used to feed data into parser objects. The following example shows how to feed data from a pipe directly into a MessagePack unpacker:

unpacker = MessagePack::Unpacker.new pipe.feed_loop(unpacker, :feed_each) { |msg| handle_msg(msg) }

Parameters:

  • receiver (any)

    receiver object

  • method (Symbol) (defaults to: :call)

    method to call

Returns:



198
199
200
# File 'lib/polyphony/extensions/pipe.rb', line 198

def feed_loop(receiver, method = :call, &block)
  Polyphony.backend_feed_loop(self, receiver, method, &block)
end

#getbyteInteger?

Reads a single byte from the pipe.

Returns:

  • (Integer, nil)

    byte value



22
23
24
25
# File 'lib/polyphony/extensions/pipe.rb', line 22

def getbyte
  char = getc
  char ? char.getbyte(0) : nil
end

#getcObject

Reads a single character from the pipe.

Returns:

  • |String, nil] read character



30
31
32
33
34
35
36
37
38
# File 'lib/polyphony/extensions/pipe.rb', line 30

def getc
  return @read_buffer.slice!(0) if @read_buffer && !@read_buffer.empty?

  @read_buffer ||= +''
  Polyphony.backend_read(self, @read_buffer, 8192, false, -1)
  return @read_buffer.slice!(0) if !@read_buffer.empty?

  nil
end

#gets(sep = $/, _limit = nil, _chomp: nil) ⇒ String?

Returns read line.

Parameters:

  • sep (String) (defaults to: $/)

    line separator

  • _limit (Integer, nil) (defaults to: nil)

    line length limit

  • _chomp (boolean, nil) (defaults to: nil)

    whether to chomp the read line

Returns:

  • (String, nil)

    read line



94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
# File 'lib/polyphony/extensions/pipe.rb', line 94

def gets(sep = $/, _limit = nil, _chomp: nil)
  if sep.is_a?(Integer)
    sep = $/
    _limit = sep
  end
  sep_size = sep.bytesize

  @read_buffer ||= +''

  while true
    idx = @read_buffer.index(sep)
    return @read_buffer.slice!(0, idx + sep_size) if idx

    result = readpartial(8192, @read_buffer, -1)
    return nil unless result
  end
rescue EOFError
  return nil
end

#puts(*args) ⇒ Object

Writes a line with line feed to the pipe.

Parameters:

  • args (Array)

    zero or more lines



131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
# File 'lib/polyphony/extensions/pipe.rb', line 131

def puts(*args)
  if args.empty?
    write LINEFEED
    return
  end

  idx = 0
  while idx < args.size
    arg = args[idx]
    args[idx] = arg = arg.to_s unless arg.is_a?(String)
    if arg =~ LINEFEED_RE
      idx += 1
    else
      args.insert(idx + 1, LINEFEED)
      idx += 2
    end
  end

  write(*args)
  nil
end

#read(len = nil, buf = nil, buffer_pos = 0) ⇒ String

Reads from the pipe.

Parameters:

  • len (Integer, nil) (defaults to: nil)

    maximum bytes to read

  • buf (String, nil) (defaults to: nil)

    buffer to read into

  • buffer_pos (Integer) (defaults to: 0)

    buffer position to read into

Returns:

  • (String)

    read data



46
47
48
49
50
51
52
53
54
55
56
# File 'lib/polyphony/extensions/pipe.rb', line 46

def read(len = nil, buf = nil, buffer_pos = 0)
  return Polyphony.backend_read(self, buf, len, true, buffer_pos) if buf

  @read_buffer ||= +''
  result = Polyphony.backend_read(self, @read_buffer, len, true, -1)
  return nil unless result

  already_read = @read_buffer
  @read_buffer = +''
  already_read
end

#read_loop(maxlen = 8192) {|String| ... } ⇒ Polyphony::Pipe

Runs a read loop.

Parameters:

  • maxlen (Integer) (defaults to: 8192)

    maximum bytes to read

Yields:

  • (String)

    read data

Returns:



180
181
182
# File 'lib/polyphony/extensions/pipe.rb', line 180

def read_loop(maxlen = 8192, &block)
  Polyphony.backend_read_loop(self, maxlen, &block)
end

#readpartial(len, buf = +'',, buffer_pos = 0, raise_on_eof = true) ⇒ String

Reads from the pipe.

Parameters:

  • len (Integer, nil)

    maximum bytes to read

  • buf (String, nil) (defaults to: +'',)

    buffer to read into

  • buffer_pos (Integer) (defaults to: 0)

    buffer position to read into

  • raise_on_eof (boolean) (defaults to: true)

    whether to raise an error if EOF is detected

Returns:

  • (String)

    read data

Raises:

  • (EOFError)


65
66
67
68
69
70
# File 'lib/polyphony/extensions/pipe.rb', line 65

def readpartial(len, buf = +'', buffer_pos = 0, raise_on_eof = true)
  result = Polyphony.backend_read(self, buf, len, false, buffer_pos)
  raise EOFError if !result && raise_on_eof

  result
end

#splice_from(src, maxlen) ⇒ Integer

Splices to the pipe from the given source. If maxlen is negative, splices repeatedly using absolute value of maxlen until EOF is encountered.

Parameters:

  • src (IO)

    source to splice from

  • maxlen (Integer)

    maximum bytes to splice

Returns:

  • (Integer)

    bytes spliced



240
241
242
# File 'lib/polyphony/extensions/pipe.rb', line 240

def splice_from(src, maxlen)
  Polyphony.backend_splice(src, self, maxlen)
end

#tee_from(src, maxlen) ⇒ Integer

Tees to the pipe from the given source.

Parameters:

  • src (IO)

    source to tee from

  • maxlen (Integer)

    maximum bytes to tee

Returns:

  • (Integer)

    bytes teed



250
251
252
# File 'lib/polyphony/extensions/pipe.rb', line 250

def tee_from(src, maxlen)
  Polyphony.backend_tee(src, self, maxlen)
end

#wait_readable(timeout = nil) ⇒ Polyphony::Pipe

Waits for pipe to become readable.

Parameters:

  • timeout (Number, nil) (defaults to: nil)

    optional timeout in seconds

Returns:



206
207
208
209
210
211
212
213
214
215
216
# File 'lib/polyphony/extensions/pipe.rb', line 206

def wait_readable(timeout = nil)
  if timeout
    move_on_after(timeout) do
      Polyphony.backend_wait_io(self, false)
      self
    end
  else
    Polyphony.backend_wait_io(self, false)
    self
  end
end

#wait_writable(timeout = nil) ⇒ Polyphony::Pipe

Waits for pipe to become writeable.

Parameters:

  • timeout (Number, nil) (defaults to: nil)

    optional timeout in seconds

Returns:



222
223
224
225
226
227
228
229
230
231
232
# File 'lib/polyphony/extensions/pipe.rb', line 222

def wait_writable(timeout = nil)
  if timeout
    move_on_after(timeout) do
      Polyphony.backend_wait_io(self, true)
      self
    end
  else
    Polyphony.backend_wait_io(self, true)
    self
  end
end

#write(buf, *args) ⇒ Integer

Returns bytes written.

Parameters:

  • buf (String)

    data to write

  • args (any)

    further arguments to pass to Polyphony.backend_write

Returns:

  • (Integer)

    bytes written



77
78
79
# File 'lib/polyphony/extensions/pipe.rb', line 77

def write(buf, *args)
  Polyphony.backend_write(self, buf, *args)
end