Class: Async::IO::Stream

Inherits:
Object
  • Object
show all
Defined in:
lib/async/io/stream.rb

Constant Summary collapse

BLOCK_SIZE =
IO::BLOCK_SIZE

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(io, block_size: BLOCK_SIZE, maximum_read_size: MAXIMUM_READ_SIZE, sync: true, deferred: false) ⇒ Stream

Returns a new instance of Stream.

[View source]

45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
# File 'lib/async/io/stream.rb', line 45

def initialize(io, block_size: BLOCK_SIZE, maximum_read_size: MAXIMUM_READ_SIZE, sync: true, deferred: false)
  @io = io
  @eof = false
  
  @pending = 0
  # This field is ignored, but used to mean, try to buffer packets in a single iteration of the reactor.
  # @deferred = deferred
  
  @writing = Async::Semaphore.new(1)
  
  # We don't want Ruby to do any IO buffering.
  @io.sync = sync
  
  @block_size = block_size
  @maximum_read_size = maximum_read_size
  
  @read_buffer = Buffer.new
  @write_buffer = Buffer.new
  @drain_buffer = Buffer.new
  
  # Used as destination buffer for underlying reads.
  @input_buffer = Buffer.new
end

Instance Attribute Details

#block_sizeObject (readonly)

Returns the value of attribute block_size.


71
72
73
# File 'lib/async/io/stream.rb', line 71

def block_size
  @block_size
end

#ioObject (readonly)

Returns the value of attribute io.


69
70
71
# File 'lib/async/io/stream.rb', line 69

def io
  @io
end

Class Method Details

.open(path, mode = "r+", **options) ⇒ Object

[View source]

33
34
35
36
37
38
39
40
41
42
43
# File 'lib/async/io/stream.rb', line 33

def self.open(path, mode = "r+", **options)
  stream = self.new(File.open(path, mode), **options)
  
  return stream unless block_given?
  
  begin
    yield stream
  ensure
    stream.close
  end
end

Instance Method Details

#<<(string) ⇒ Object

Writes ‘string` to the stream and returns self.

[View source]

183
184
185
186
187
# File 'lib/async/io/stream.rb', line 183

def <<(string)
  write(string)
  
  return self
end

#closeObject

Best effort to flush any unwritten data, and then close the underling IO.

[View source]

216
217
218
219
220
221
222
223
224
225
226
# File 'lib/async/io/stream.rb', line 216

def close
  return if @io.closed?
  
  begin
    flush
  rescue
    # We really can't do anything here unless we want #close to raise exceptions.
  ensure
    @io.close
  end
end

#close_readObject

[View source]

205
206
207
# File 'lib/async/io/stream.rb', line 205

def close_read
  @io.close_read
end

#close_writeObject

[View source]

209
210
211
212
213
# File 'lib/async/io/stream.rb', line 209

def close_write
  flush
ensure
  @io.close_write
end

#closed?Boolean

Returns:

  • (Boolean)
[View source]

201
202
203
# File 'lib/async/io/stream.rb', line 201

def closed?
  @io.closed?
end

#connected?Boolean

Returns:

  • (Boolean)
[View source]

197
198
199
# File 'lib/async/io/stream.rb', line 197

def connected?
  @io.connected?
end

#eof!Object

Raises:

  • (EOFError)
[View source]

241
242
243
244
245
246
# File 'lib/async/io/stream.rb', line 241

def eof!
  @read_buffer.clear
  @eof = true
  
  raise EOFError
end

#eof?Boolean Also known as: eof

Returns true if the stream is at file which means there is no more data to be read.

Returns:

  • (Boolean)
[View source]

229
230
231
232
233
234
235
236
237
# File 'lib/async/io/stream.rb', line 229

def eof?
  if !@read_buffer.empty?
    return false
  elsif @eof
    return true
  else
    return @io.eof?
  end
end

#flushObject

Flushes buffered data to the stream.

[View source]

152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
# File 'lib/async/io/stream.rb', line 152

def flush
  return if @write_buffer.empty?
  
  @writing.acquire do
    # Flip the write buffer and drain buffer:
    @write_buffer, @drain_buffer = @drain_buffer, @write_buffer
    
    begin
      @io.write(@drain_buffer)
    ensure
      # If the write operation fails, we still need to clear this buffer, and the data is essentially lost.
      @drain_buffer.clear
    end
  end
end

#gets(separator = $/, **options) ⇒ Object

[View source]

147
148
149
# File 'lib/async/io/stream.rb', line 147

def gets(separator = $/, **options)
  read_until(separator, **options)
end

#peekObject

[View source]

141
142
143
144
145
# File 'lib/async/io/stream.rb', line 141

def peek
  until yield(@read_buffer) or @eof
    fill_read_buffer
  end
end

#puts(*arguments, separator: $/) ⇒ Object

[View source]

189
190
191
192
193
194
195
# File 'lib/async/io/stream.rb', line 189

def puts(*arguments, separator: $/)
  arguments.each do |argument|
    @write_buffer << argument << separator
  end
  
  flush
end

#read(size = nil) ⇒ Object

Reads ‘size` bytes from the stream. If size is not specified, read until end of file.

[View source]

74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
# File 'lib/async/io/stream.rb', line 74

def read(size = nil)
  return String.new(encoding: Encoding::BINARY) if size == 0
  
  if size
    until @eof or @read_buffer.bytesize >= size
      # Compute the amount of data we need to read from the underlying stream:
      read_size = size - @read_buffer.bytesize
      
      # Don't read less than @block_size to avoid lots of small reads:
      fill_read_buffer(read_size > @block_size ? read_size : @block_size)
    end
  else
    until @eof
      fill_read_buffer
    end
  end
  
  return consume_read_buffer(size)
end

#read_exactly(size, exception: EOFError) ⇒ Object

Raises:

  • (exception)
[View source]

105
106
107
108
109
110
111
112
113
114
115
# File 'lib/async/io/stream.rb', line 105

def read_exactly(size, exception: EOFError)
  if buffer = read(size)
    if buffer.bytesize != size
      raise exception, "could not read enough data"
    end
    
    return buffer
  end
  
  raise exception, "encountered eof while reading data"
end

#read_partial(size = nil) ⇒ Object Also known as: readpartial

Read at most ‘size` bytes from the stream. Will avoid reading from the underlying stream if possible.

[View source]

95
96
97
98
99
100
101
102
103
# File 'lib/async/io/stream.rb', line 95

def read_partial(size = nil)
  return String.new(encoding: Encoding::BINARY) if size == 0

  if !@eof and @read_buffer.empty?
    fill_read_buffer
  end
  
  return consume_read_buffer(size)
end

#read_until(pattern, offset = 0, chomp: true) ⇒ String

Efficiently read data from the stream until encountering pattern.

Parameters:

  • pattern (String)

    The pattern to match.

Returns:

  • (String)

    The contents of the stream up until the pattern, which is consumed but not returned.

[View source]

122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
# File 'lib/async/io/stream.rb', line 122

def read_until(pattern, offset = 0, chomp: true)
  # We don't want to split on the pattern, so we subtract the size of the pattern.
  split_offset = pattern.bytesize - 1
  
  until index = @read_buffer.index(pattern, offset)
    offset = @read_buffer.bytesize - split_offset
    
    offset = 0 if offset < 0
    
    return unless fill_read_buffer
  end
  
  @read_buffer.freeze
  matched = @read_buffer.byteslice(0, index+(chomp ? 0 : pattern.bytesize))
  @read_buffer = @read_buffer.byteslice(index+pattern.bytesize, @read_buffer.bytesize)
  
  return matched
end

#write(string) ⇒ Object

Writes ‘string` to the buffer. When the buffer is full or #sync is true the buffer is flushed to the underlying `io`.

Parameters:

  • string

    the string to write to the buffer.

Returns:

  • the number of bytes appended to the buffer.

[View source]

172
173
174
175
176
177
178
179
180
# File 'lib/async/io/stream.rb', line 172

def write(string)
  @write_buffer << string
  
  if @write_buffer.bytesize >= @block_size
    flush
  end
  
  return string.bytesize
end