Module: ObjectStream
- Includes:
- Enumerable
- Included in:
- JsonStream, MarshalStream, MsgpackStream, YamlStream
- Defined in:
- lib/object-stream.rb
Overview
Stream of objects, with any underlying IO: File, Pipe, Socket, StringIO. Stream is bidirectional if the IO is bidirectional.
Serializes objects using any of several serializers: marshal, yaml, json, msgpack. Works with select/readpartial if the serializer supports it (msgpack and yajl do).
ObjectStream supports three styles of iteration: Enumerable, blocking read, and yielding (non-blocking) read.
Defined Under Namespace
Classes: JsonStream, MarshalStream, MsgpackStream, OverflowError, StreamError, YamlStream
Constant Summary collapse
- VERSION =
"0.5"
- MARSHAL_TYPE =
"marshal".freeze
- YAML_TYPE =
"yaml".freeze
- JSON_TYPE =
"json".freeze
- MSGPACK_TYPE =
"msgpack".freeze
- TYPES =
[ MARSHAL_TYPE, YAML_TYPE, JSON_TYPE, MSGPACK_TYPE ]
- DEFAULT_MAX_OUTBOX =
10
Instance Attribute Summary collapse
-
#io ⇒ Object
readonly
The IO through which the stream reads and writes serialized object data.
-
#max_outbox ⇒ Object
readonly
Number of outgoing objects that can accumulate before the outbox is serialized to the byte buffer (and possibly to the io).
Class Method Summary collapse
- .new(io, type: MARSHAL_TYPE, **opts) ⇒ Object
- .register_type(type, &bl) ⇒ Object
- .stream_class_for(type) ⇒ Object
Instance Method Summary collapse
- #checked_read_from_stream ⇒ Object
-
#close ⇒ Object
Call this if the most recent write was a #write_to_buffer without a #flush_buffer.
- #closed? ⇒ Boolean
-
#each ⇒ Object
Iterate through the (rest of) the stream of objects.
- #eof? ⇒ Boolean (also: #eof)
- #flush_buffer ⇒ Object
- #flush_outbox ⇒ Object
- #initialize(io, max_outbox: DEFAULT_MAX_OUTBOX, **opts) ⇒ Object
-
#read ⇒ Object
If no block given, behaves just the same as #read_one.
-
#read_one ⇒ Object
Read one object from the stream, blocking if necessary.
-
#to_io ⇒ Object
Makes it possible to use stream in a select.
- #to_s ⇒ Object
-
#write(*objects) ⇒ Object
(also: #<<)
Write the given objects to the stream, first flushing any objects in the outbox.
- #write_to_buffer(*objects) ⇒ Object
-
#write_to_outbox(object = nil, &bl) ⇒ Object
Push the given object into the outbox, to be written later when the outbox is flushed.
Instance Attribute Details
#io ⇒ Object (readonly)
The IO through which the stream reads and writes serialized object data.
16 17 18 |
# File 'lib/object-stream.rb', line 16 def io @io end |
#max_outbox ⇒ Object (readonly)
Number of outgoing objects that can accumulate before the outbox is serialized to the byte buffer (and possibly to the io).
20 21 22 |
# File 'lib/object-stream.rb', line 20 def max_outbox @max_outbox end |
Class Method Details
.new(io, type: MARSHAL_TYPE, **opts) ⇒ Object
44 45 46 47 48 49 50 |
# File 'lib/object-stream.rb', line 44 def new io, type: MARSHAL_TYPE, **opts if io.kind_of? ObjectStream raise ArgumentError, "given io is already an ObjectStream: #{io.inspect}" end stream_class_for(type).new io, **opts end |
.register_type(type, &bl) ⇒ Object
64 65 66 |
# File 'lib/object-stream.rb', line 64 def register_type type, &bl @stream_class_map[type] = bl end |
.stream_class_for(type) ⇒ Object
52 53 54 55 56 57 58 59 60 61 62 |
# File 'lib/object-stream.rb', line 52 def stream_class_for type cl = @stream_class_map[type] return cl if cl.respond_to? :new # Protect against race condition in msgpack and yajl extension # initialization (bug #8374). @mutex.synchronize do return cl if cl.respond_to? :new @stream_class_map[type] = cl.call end end |
Instance Method Details
#checked_read_from_stream ⇒ Object
93 94 95 96 97 98 99 |
# File 'lib/object-stream.rb', line 93 def checked_read_from_stream read_from_stream {|obj| yield obj} rescue IOError, SystemCallError, OverflowError raise rescue => ex raise StreamError, "unreadble stream: #{ex}" end |
#close ⇒ Object
Call this if the most recent write was a #write_to_buffer without a #flush_buffer. If you only use #write, there’s no need to close the stream in any special way.
186 187 188 189 |
# File 'lib/object-stream.rb', line 186 def close flush_outbox io.close end |
#closed? ⇒ Boolean
191 192 193 |
# File 'lib/object-stream.rb', line 191 def closed? io.closed? end |
#each ⇒ Object
Iterate through the (rest of) the stream of objects. Does not raise EOFError, but simply returns. All Enumerable and Enumerator methods are available.
172 173 174 175 176 |
# File 'lib/object-stream.rb', line 172 def each return to_enum unless block_given? read {|obj| yield obj} until eof rescue EOFError end |
#eof? ⇒ Boolean Also known as: eof
178 179 180 |
# File 'lib/object-stream.rb', line 178 def eof? (!@inbox || @inbox.empty?) && io.eof? end |
#flush_buffer ⇒ Object
165 166 167 |
# File 'lib/object-stream.rb', line 165 def flush_buffer self end |
#flush_outbox ⇒ Object
148 149 150 151 152 153 154 155 |
# File 'lib/object-stream.rb', line 148 def flush_outbox @outbox.each do |object| object = object.call if object.kind_of? Proc write_to_stream object end @outbox.clear self end |
#initialize(io, max_outbox: DEFAULT_MAX_OUTBOX, **opts) ⇒ Object
69 70 71 72 73 74 |
# File 'lib/object-stream.rb', line 69 def initialize io, max_outbox: DEFAULT_MAX_OUTBOX, **opts @io = io @max_outbox = max_outbox @inbox = nil @outbox = [] end |
#read ⇒ Object
If no block given, behaves just the same as #read_one. If block given, reads any available data and yields it to the block. This form is non- blocking, if supported by the underlying serializer (such as msgpack).
83 84 85 86 87 88 89 90 91 |
# File 'lib/object-stream.rb', line 83 def read if block_given? read_from_inbox {|obj| yield obj} checked_read_from_stream {|obj| yield obj} return nil else read_one end end |
#read_one ⇒ Object
Read one object from the stream, blocking if necessary. Returns the object. Raises EOFError at the end of the stream.
103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 |
# File 'lib/object-stream.rb', line 103 def read_one if @inbox and not @inbox.empty? return @inbox.shift end have_result = false result = nil until have_result read do |obj| # might not read enough bytes to yield an obj if have_result (@inbox||=[]) << obj else have_result = true result = obj end end end result end |
#to_io ⇒ Object
Makes it possible to use stream in a select.
196 197 198 |
# File 'lib/object-stream.rb', line 196 def to_io io end |
#to_s ⇒ Object
76 77 78 |
# File 'lib/object-stream.rb', line 76 def to_s "#<#{self.class} io=#{io.inspect}>" end |
#write(*objects) ⇒ Object Also known as: <<
Write the given objects to the stream, first flushing any objects in the outbox. Flushes the underlying byte buffer afterwards.
133 134 135 136 |
# File 'lib/object-stream.rb', line 133 def write *objects write_to_buffer *objects flush_buffer end |
#write_to_buffer(*objects) ⇒ Object
157 158 159 160 161 162 163 |
# File 'lib/object-stream.rb', line 157 def write_to_buffer *objects flush_outbox objects.each do |object| write_to_stream object end self end |
#write_to_outbox(object = nil, &bl) ⇒ Object
Push the given object into the outbox, to be written later when the outbox is flushed. If a block is given, it will be called when the outbox is flushed, and its value will be written instead.
142 143 144 145 146 |
# File 'lib/object-stream.rb', line 142 def write_to_outbox object=nil, &bl @outbox << (bl || object) flush_outbox if @outbox.size > max_outbox self end |