Module: DTAS::Buffer::ReadWrite
- Defined in:
- lib/dtas/buffer/read_write.rb
Overview
:nodoc:
Constant Summary collapse
- MAX_AT_ONCE =
min PIPE_BUF value in POSIX
512
Instance Attribute Summary collapse
-
#buffer_size ⇒ Object
Returns the value of attribute buffer_size.
Instance Method Summary collapse
- #_rbuf ⇒ Object
- #broadcast_inf(targets) ⇒ Object
-
#broadcast_one(targets) ⇒ Object
always block when we have a single target.
-
#discard(bytes) ⇒ Object
be sure to only call this with nil when all writers to @wr are done.
Instance Attribute Details
#buffer_size ⇒ Object
Returns the value of attribute buffer_size.
9 10 11 |
# File 'lib/dtas/buffer/read_write.rb', line 9 def buffer_size @buffer_size end |
Instance Method Details
#_rbuf ⇒ Object
11 12 13 |
# File 'lib/dtas/buffer/read_write.rb', line 11 def _rbuf Thread.current[:dtas_pbuf] ||= "" end |
#broadcast_inf(targets) ⇒ Object
43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 |
# File 'lib/dtas/buffer/read_write.rb', line 43 def broadcast_inf(targets) nr_nb = targets.count(&:nonblock?) if nr_nb == 0 || nr_nb == targets.size # if all targets are full, don't start until they're all writable r = IO.select(nil, targets, nil, 0) or return targets blocked = targets - r[1] # tell DTAS::UNIXServer#run_once to wait on the blocked targets return blocked if blocked[0] # all writable, yay! else blocked = [] end again = {} # don't pin too much on one target bytes = inflight bytes = bytes > MAX_AT_ONCE ? MAX_AT_ONCE : bytes buf = _rbuf @to_io.read(bytes, buf) n = buf.bytesize @bytes_xfer += n targets.delete_if do |dst| begin if dst.nonblock? w = dst.write_nonblock(buf) again[dst] = buf.byteslice(w, n) if w < n else dst.write(buf) end false rescue Errno::EAGAIN blocked << dst false rescue IOError, Errno::EPIPE => e again.delete(dst) __dst_error(dst, e) true end end # try to write as much as possible again.delete_if do |dst, sbuf| begin w = dst.write_nonblock(sbuf) n = sbuf.bytesize if w < n again[dst] = sbuf.byteslice(w, n) false else true end rescue Errno::EAGAIN blocked << dst true rescue IOError, Errno::EPIPE => e __dst_error(dst, e) true end end until again.empty? targets[0] ? :wait_readable : nil end |
#broadcast_one(targets) ⇒ Object
always block when we have a single target
27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 |
# File 'lib/dtas/buffer/read_write.rb', line 27 def broadcast_one(targets) buf = _rbuf @to_io.read_nonblock(MAX_AT_ONCE, buf) n = targets[0].write(buf) # IO#write has write-in-full behavior @bytes_xfer += n :wait_readable rescue EOFError nil rescue Errno::EAGAIN :wait_readable rescue Errno::EPIPE, IOError => e __dst_error(targets[0], e) targets.clear nil # do not return error here, we already spewed an error message end |
#discard(bytes) ⇒ Object
be sure to only call this with nil when all writers to @wr are done
16 17 18 19 20 21 22 23 24 |
# File 'lib/dtas/buffer/read_write.rb', line 16 def discard(bytes) buf = _rbuf begin @to_io.readpartial(bytes, buf) bytes -= buf.bytesize rescue EOFError return end until bytes == 0 end |