Class: Celluloid::IO::Stream

Inherits:
Object
  • Object
show all
Includes:
Enumerable
Defined in:
lib/celluloid/io/stream.rb

Overview

Base class of all streams in Celluloid::IO

Direct Known Subclasses

SSLSocket, TCPSocket, UNIXSocket

Defined Under Namespace

Classes: Latch

Constant Summary collapse

BLOCK_SIZE =

Default size to read from or write to the stream for buffer operations

1024*16

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initializeStream

Returns a new instance of Stream.



22
23
24
25
26
27
28
29
30
# File 'lib/celluloid/io/stream.rb', line 22

def initialize
  @eof  = false
  @sync = true # FIXME: hax
  @read_buffer = ''.force_encoding(Encoding::ASCII_8BIT)
  @write_buffer = ''.force_encoding(Encoding::ASCII_8BIT)

  @read_latch  = Latch.new
  @write_latch = Latch.new
end

Instance Attribute Details

#syncObject

The “sync mode” of the stream

See IO#sync for full details.



17
18
19
# File 'lib/celluloid/io/stream.rb', line 17

def sync
  @sync
end

Instance Method Details

#<<(s) ⇒ Object

Writes s to the stream. s will be converted to a String using String#to_s.



255
256
257
258
# File 'lib/celluloid/io/stream.rb', line 255

def << (s)
  do_write(s)
  self
end

#closeObject

Closes the stream and flushes any unwritten data.



310
311
312
313
# File 'lib/celluloid/io/stream.rb', line 310

def close
  flush rescue nil
  sysclose
end

#each(eol = $/) ⇒ Object Also known as: each_line

Executes the block for every line in the stream where lines are separated by eol.

See also #gets



180
181
182
183
184
# File 'lib/celluloid/io/stream.rb', line 180

def each(eol=$/)
  while line = self.gets(eol)
    yield line
  end
end

#each_byteObject

Calls the given block once for each byte in the stream.



215
216
217
218
219
# File 'lib/celluloid/io/stream.rb', line 215

def each_byte # :yields: byte
  while c = getc
    yield(c.ord)
  end
end

#eof?Boolean Also known as: eof

Returns true if the stream is at file which means there is no more data to be read.

Returns:

  • (Boolean)


240
241
242
243
# File 'lib/celluloid/io/stream.rb', line 240

def eof?
  fill_rbuff if !@eof && @read_buffer.empty?
  @eof && @read_buffer.empty?
end

#flushObject

Flushes buffered data to the stream.



300
301
302
303
304
305
306
307
# File 'lib/celluloid/io/stream.rb', line 300

def flush
  osync = @sync
  @sync = true
  do_write ""
  return self
ensure
  @sync = osync
end

#getcObject

Reads one character from the stream. Returns nil if called at end of file.



210
211
212
# File 'lib/celluloid/io/stream.rb', line 210

def getc
  read(1)
end

#gets(eol = $/, limit = nil) ⇒ Object

Reads the next “line+ from the stream. Lines are separated by eol. If limit is provided the result will not be longer than the given number of bytes.

eol may be a String or Regexp.

Unlike IO#gets the line read will not be assigned to $_.

Unlike IO#gets the separator must be provided if a limit is provided.



154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
# File 'lib/celluloid/io/stream.rb', line 154

def gets(eol=$/, limit=nil)
  idx = @read_buffer.index(eol)

  until @eof
    break if idx
    fill_rbuff
    idx = @read_buffer.index(eol)
  end

  if eol.is_a?(Regexp)
    size = idx ? idx+$&.size : nil
  else
    size = idx ? idx+eol.size : nil
  end

  if limit and limit >= 0
    size = [size, limit].min
  end

  consume_rbuff(size)
end

Writes args to the stream.

See IO#print for full details.



283
284
285
286
287
288
# File 'lib/celluloid/io/stream.rb', line 283

def print(*args)
  s = ""
  args.each { |arg| s << arg.to_s }
  do_write(s)
  nil
end

#printf(s, *args) ⇒ Object

Formats and writes to the stream converting parameters under control of the format string.

See Kernel#sprintf for format string details.



294
295
296
297
# File 'lib/celluloid/io/stream.rb', line 294

def printf(s, *args)
  do_write(s % args)
  nil
end

#puts(*args) ⇒ Object

Writes args to the stream along with a record separator.

See IO#puts for full details.



263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
# File 'lib/celluloid/io/stream.rb', line 263

def puts(*args)
  s = ""
  if args.empty?
    s << "\n"
  end

  args.each do |arg|
    s << arg.to_s
    if $/ && /\n\z/ !~ s
      s << "\n"
    end
  end

  do_write(s)
  nil
end

#read(size = nil, buf = nil) ⇒ Object

Reads size bytes from the stream. If buf is provided it must reference a string which will receive the data.

See IO#read for full details.



86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
# File 'lib/celluloid/io/stream.rb', line 86

def read(size=nil, buf=nil)
  if size == 0
    if buf
      buf.clear
      return buf
    else
      return ""
    end
  end

  until @eof
    break if size && size <= @read_buffer.size
    fill_rbuff
    break unless size
  end

  ret = consume_rbuff(size) || ""

  if buf
    buf.replace(ret)
    ret = buf
  end

  (size && ret.empty?) ? nil : ret
end

#readcharObject

Reads a one-character string from the stream. Raises an EOFError at end of file.

Raises:

  • (EOFError)


223
224
225
226
# File 'lib/celluloid/io/stream.rb', line 223

def readchar
  raise EOFError if eof?
  getc
end

#readline(eol = $/) ⇒ Object

Reads a line from the stream which is separated by eol.

Raises EOFError if at end of file.

Raises:

  • (EOFError)


203
204
205
206
# File 'lib/celluloid/io/stream.rb', line 203

def readline(eol=$/)
  raise EOFError if eof?
  gets(eol)
end

#readlines(eol = $/) ⇒ Object

Reads lines from the stream which are separated by eol.

See also #gets



190
191
192
193
194
195
196
197
198
# File 'lib/celluloid/io/stream.rb', line 190

def readlines(eol=$/)
  ary = []

  while line = self.gets(eol)
    ary << line
  end

  ary
end

#readpartial(maxlen, buf = nil) ⇒ Object

Reads at most maxlen bytes from the stream. If buf is provided it must reference a string which will receive the data.

See IO#readpartial for full details.

Raises:

  • (EOFError)


116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
# File 'lib/celluloid/io/stream.rb', line 116

def readpartial(maxlen, buf=nil)
  if maxlen == 0
    if buf
      buf.clear
      return buf
    else
      return ""
    end
  end

  if @read_buffer.empty?
    begin
      return sysread(maxlen, buf)
    rescue Errno::EAGAIN
      retry
    end
  end

  ret = consume_rbuff(maxlen)

  if buf
    buf.replace(ret)
    ret = buf
  end

  raise EOFError if ret.empty?
  ret
end

#sysread(length = nil, buffer = nil) ⇒ Object

System read via the nonblocking subsystem



39
40
41
42
43
44
45
46
47
48
49
50
51
52
# File 'lib/celluloid/io/stream.rb', line 39

def sysread(length = nil, buffer = nil)
  buffer ||= ''.force_encoding(Encoding::ASCII_8BIT)

  @read_latch.synchronize do
    begin
      read_nonblock(length, buffer)
    rescue ::IO::WaitReadable
      wait_readable
      retry
    end
  end

  buffer
end

#syswrite(string) ⇒ Object

System write via the nonblocking subsystem



55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
# File 'lib/celluloid/io/stream.rb', line 55

def syswrite(string)
  length = string.length
  total_written = 0

  remaining = string

  @write_latch.synchronize do
    while total_written < length
      begin
        written = write_nonblock(remaining)
      rescue ::IO::WaitWritable
        wait_writable
        retry
      rescue EOFError
        return total_written
      end

      total_written += written

      # FIXME: mutating the original buffer here. Seems bad.
      remaining.slice!(0, written) if written < remaining.length
    end
  end

  total_written
end

#ungetc(c) ⇒ Object

Pushes character c back onto the stream such that a subsequent buffered character read will return it.

Unlike IO#getc multiple bytes may be pushed back onto the stream.

Has no effect on unbuffered reads (such as #sysread).



234
235
236
# File 'lib/celluloid/io/stream.rb', line 234

def ungetc(c)
  @read_buffer[0,0] = c.chr
end

#wait_readableObject

Wait until the current object is readable



33
# File 'lib/celluloid/io/stream.rb', line 33

def wait_readable; Celluloid::IO.wait_readable(self); end

#wait_writableObject

Wait until the current object is writable



36
# File 'lib/celluloid/io/stream.rb', line 36

def wait_writable; Celluloid::IO.wait_writable(self); end

#write(s) ⇒ Object

Writes s to the stream. If the argument is not a string it will be converted using String#to_s. Returns the number of bytes written.



248
249
250
251
# File 'lib/celluloid/io/stream.rb', line 248

def write(s)
  do_write(s)
  s.bytesize
end