Class: ObjectStream::MsgpackStream

Inherits:
Object
  • Object
show all
Includes:
ObjectStream
Defined in:
lib/object-stream.rb

Constant Summary collapse

DEFAULT_CHUNK_SIZE =
2000
DEFAULT_MAXBUF =
4000

Constants included from ObjectStream

DEFAULT_MAX_OUTBOX, JSON_TYPE, MARSHAL_TYPE, MSGPACK_TYPE, TYPES, VERSION, YAML_TYPE

Instance Attribute Summary collapse

Attributes included from ObjectStream

#io, #max_outbox

Instance Method Summary collapse

Methods included from ObjectStream

#checked_read_from_stream, #close, #closed?, #each, #eof?, #flush_outbox, new, #read, #read_one, register_type, stream_class_for, #to_io, #to_s, #write, #write_to_outbox

Constructor Details

#initialize(io, chunk_size: DEFAULT_CHUNK_SIZE, maxbuf: DEFAULT_MAXBUF, symbolize_keys: false) ⇒ MsgpackStream

See the discussion in examples/symbolize-keys.rb.



285
286
287
288
289
290
291
292
293
294
# File 'lib/object-stream.rb', line 285

def initialize io, chunk_size: DEFAULT_CHUNK_SIZE, maxbuf: DEFAULT_MAXBUF,
      symbolize_keys: false
  super
  @unpacker = MessagePack::Unpacker.new(symbolize_keys: symbolize_keys)
    # don't specify io, so don't have to read all of io in one loop
  
  @packer = MessagePack::Packer.new(io)
  @chunk_size = chunk_size
  @maxbuf = maxbuf
end

Instance Attribute Details

#chunk_sizeObject

Returns the value of attribute chunk_size.



278
279
280
# File 'lib/object-stream.rb', line 278

def chunk_size
  @chunk_size
end

#maxbufObject

Returns the value of attribute maxbuf.



279
280
281
# File 'lib/object-stream.rb', line 279

def maxbuf
  @maxbuf
end

Instance Method Details

#checkbufObject



315
316
317
318
319
320
# File 'lib/object-stream.rb', line 315

def checkbuf
  if maxbuf and @unpacker.buffer.size > maxbuf
    raise OverflowError,
      "Exceeded buffer limit by #{@unpacker.buffer.size - maxbuf} bytes."
  end
end

#fill_buffer(n) ⇒ Object



305
306
307
# File 'lib/object-stream.rb', line 305

def fill_buffer n
  @unpacker.feed(io.readpartial(n))
end

#flush_bufferObject



335
336
337
338
# File 'lib/object-stream.rb', line 335

def flush_buffer
  @packer.flush
  self
end

#read_from_bufferObject



309
310
311
312
313
# File 'lib/object-stream.rb', line 309

def read_from_buffer
  @unpacker.each do |obj|
    yield obj
  end
end

#read_from_streamObject

Blocks only if no data available on io.



297
298
299
300
301
302
303
# File 'lib/object-stream.rb', line 297

def read_from_stream
  fill_buffer(chunk_size)
  checkbuf if maxbuf
  read_from_buffer do |obj|
    yield obj
  end
end

#write_to_buffer(*objects) ⇒ Object



327
328
329
330
331
332
333
# File 'lib/object-stream.rb', line 327

def write_to_buffer *objects
  flush_outbox
  objects.each do |object|
    @packer.write(object)
  end
  self
end

#write_to_stream(object) ⇒ Object



322
323
324
325
# File 'lib/object-stream.rb', line 322

def write_to_stream object
  @packer.write(object).flush
  self
end