Class: Async::IO::Stream
- Inherits:
-
Object
- Object
- Async::IO::Stream
- Defined in:
- lib/async/io/stream.rb
Constant Summary collapse
- BLOCK_SIZE =
The default block size for IO buffers. BLOCK_SIZE = ENV.fetch(‘BLOCK_SIZE’, 1024*16).to_i
1024*8
Instance Attribute Summary collapse
-
#block_size ⇒ Object
readonly
Returns the value of attribute block_size.
-
#io ⇒ Object
readonly
Returns the value of attribute io.
Instance Method Summary collapse
-
#<<(string) ⇒ Object
Writes ‘string` to the stream and returns self.
-
#close ⇒ Object
Closes the stream and flushes any unwritten data.
- #closed? ⇒ Boolean
- #connected? ⇒ Boolean
- #eof! ⇒ Object
-
#eof? ⇒ Boolean
(also: #eof)
Returns true if the stream is at file which means there is no more data to be read.
-
#flush ⇒ Object
Flushes buffered data to the stream.
- #gets(separator = $/, **options) ⇒ Object
-
#initialize(io, block_size: BLOCK_SIZE, sync: true) ⇒ Stream
constructor
A new instance of Stream.
- #peek ⇒ Object
- #puts(*args, separator: $/) ⇒ Object
-
#read(size = nil) ⇒ Object
Reads ‘size` bytes from the stream.
-
#read_partial(size = nil) ⇒ Object
(also: #readpartial)
Read at most ‘size` bytes from the stream.
-
#read_until(pattern, offset = 0, chomp: true) ⇒ String
Efficiently read data from the stream until encountering pattern.
-
#write(string) ⇒ Object
Writes ‘string` to the buffer.
Constructor Details
#initialize(io, block_size: BLOCK_SIZE, sync: true) ⇒ Stream
Returns a new instance of Stream.
31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 |
# File 'lib/async/io/stream.rb', line 31 def initialize(io, block_size: BLOCK_SIZE, sync: true) @io = io @eof = false # We don't want Ruby to do any IO buffering. @io.sync = sync @block_size = block_size @read_buffer = Buffer.new @write_buffer = Buffer.new # Used as destination buffer for underlying reads. @input_buffer = Buffer.new end |
Instance Attribute Details
#block_size ⇒ Object (readonly)
Returns the value of attribute block_size.
48 49 50 |
# File 'lib/async/io/stream.rb', line 48 def block_size @block_size end |
#io ⇒ Object (readonly)
Returns the value of attribute io.
47 48 49 |
# File 'lib/async/io/stream.rb', line 47 def io @io end |
Instance Method Details
#<<(string) ⇒ Object
Writes ‘string` to the stream and returns self.
132 133 134 135 136 |
# File 'lib/async/io/stream.rb', line 132 def <<(string) write(string) return self end |
#close ⇒ Object
Closes the stream and flushes any unwritten data.
167 168 169 170 171 172 173 174 175 176 177 178 |
# File 'lib/async/io/stream.rb', line 167 def close return if @io.closed? begin flush rescue # We really can't do anything here unless we want #close to raise exceptions. Async.logger.error(self) {$!} ensure @io.close end end |
#closed? ⇒ Boolean
162 163 164 |
# File 'lib/async/io/stream.rb', line 162 def closed? @io.closed? end |
#connected? ⇒ Boolean
158 159 160 |
# File 'lib/async/io/stream.rb', line 158 def connected? @io.connected? end |
#eof! ⇒ Object
189 190 191 192 193 194 |
# File 'lib/async/io/stream.rb', line 189 def eof! @read_buffer.clear @eof = true raise EOFError end |
#eof? ⇒ Boolean Also known as: eof
Returns true if the stream is at file which means there is no more data to be read.
181 182 183 184 185 |
# File 'lib/async/io/stream.rb', line 181 def eof? fill_read_buffer if !@eof && @read_buffer.empty? return @eof && @read_buffer.empty? end |
#flush ⇒ Object
Flushes buffered data to the stream.
139 140 141 142 143 144 |
# File 'lib/async/io/stream.rb', line 139 def flush unless @write_buffer.empty? syswrite(@write_buffer) @write_buffer.clear end end |
#gets(separator = $/, **options) ⇒ Object
146 147 148 |
# File 'lib/async/io/stream.rb', line 146 def gets(separator = $/, **) read_until(separator, **) end |
#peek ⇒ Object
106 107 108 109 110 |
# File 'lib/async/io/stream.rb', line 106 def peek until yield(@read_buffer) or @eof fill_read_buffer end end |
#puts(*args, separator: $/) ⇒ Object
150 151 152 153 154 155 156 |
# File 'lib/async/io/stream.rb', line 150 def puts(*args, separator: $/) args.each do |arg| @write_buffer << arg << separator end flush end |
#read(size = nil) ⇒ Object
Reads ‘size` bytes from the stream. If size is not specified, read until end of file.
51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 |
# File 'lib/async/io/stream.rb', line 51 def read(size = nil) return '' if size == 0 if size until @eof or @read_buffer.size >= size # Compute the amount of data we need to read from the underlying stream: read_size = size - @read_buffer.bytesize # Don't read less than @block_size to avoid lots of small reads: fill_read_buffer(read_size > @block_size ? read_size : @block_size) end else until @eof fill_read_buffer end end return consume_read_buffer(size) end |
#read_partial(size = nil) ⇒ Object Also known as: readpartial
Read at most ‘size` bytes from the stream. Will avoid reading from the underlying stream if possible.
72 73 74 75 76 77 78 79 80 |
# File 'lib/async/io/stream.rb', line 72 def read_partial(size = nil) return '' if size == 0 if @read_buffer.empty? and !@eof fill_read_buffer end return consume_read_buffer(size) end |
#read_until(pattern, offset = 0, chomp: true) ⇒ String
Efficiently read data from the stream until encountering pattern.
87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 |
# File 'lib/async/io/stream.rb', line 87 def read_until(pattern, offset = 0, chomp: true) # We don't want to split on the pattern, so we subtract the size of the pattern. split_offset = pattern.bytesize - 1 until index = @read_buffer.index(pattern, offset) offset = @read_buffer.size - split_offset offset = 0 if offset < 0 return unless fill_read_buffer end @read_buffer.freeze matched = @read_buffer.byteslice(0, index+(chomp ? 0 : pattern.bytesize)) @read_buffer = @read_buffer.byteslice(index+pattern.bytesize, @read_buffer.bytesize) return matched end |
#write(string) ⇒ Object
Writes ‘string` to the buffer. When the buffer is full or #sync is true the buffer is flushed to the underlying `io`.
116 117 118 119 120 121 122 123 124 125 126 127 128 129 |
# File 'lib/async/io/stream.rb', line 116 def write(string) if @write_buffer.empty? and string.bytesize >= @block_size syswrite(string) else @write_buffer << string if @write_buffer.size >= @block_size syswrite(@write_buffer) @write_buffer.clear end end return string.bytesize end |