Class: Pocolog::BlockStream
- Inherits:
-
Object
- Object
- Pocolog::BlockStream
- Defined in:
- lib/pocolog/block_stream.rb
Overview
Enumeration of blocks in a pocolog-compatible stream
Defined Under Namespace
Classes: BlockHeader, DataBlockHeader, StreamBlock
Constant Summary collapse
- FORMAT_VERSION =
Format::Current::VERSION
- BLOCK_HEADER_SIZE =
The size of the generic block header
Format::Current::BLOCK_HEADER_SIZE
- TIME_SIZE =
The size of a time in a block header
Format::Current::TIME_SIZE
- MAGIC =
Magic code at the beginning of the log file
Format::Current::MAGIC
- DEFAULT_BUFFER_READ =
Read by 4kB chunks by default
4 * 1024
Instance Attribute Summary collapse
-
#buffer_read ⇒ Integer
readonly
The amount of bytes that should be read into the internal buffer.
-
#io ⇒ Object
readonly
The underlying IO.
Class Method Summary collapse
-
.open(path) ⇒ BlockStream
Create a BlockStream object that acts on a given file.
- .read_block_header(io, pos = nil) ⇒ BlockHeader
- .read_block_header_raw(io, pos = nil) ⇒ Object
- .read_stream_block(io, pos = nil) ⇒ Object
Instance Method Summary collapse
-
#big_endian? ⇒ Boolean
Whether the data in the file is stored in little or big endian.
-
#close ⇒ Object
Close the file.
-
#closed? ⇒ Boolean
Whether this stream is closed.
-
#flush ⇒ Object
Flush buffers to the underlying backing store.
-
#initialize(io, buffer_read: DEFAULT_BUFFER_READ) ⇒ BlockStream
constructor
Create a BlockStream object to sequentially interpret a stream of data.
-
#path ⇒ String
The IO path, if the backing IO is a file.
-
#read(size) ⇒ Object
private
Read bytes.
-
#read_block_header_raw ⇒ Object
Read the bytes from the block header at the current position.
-
#read_block_raw ⇒ (BlockHeader, String, String)
Read the bytes from the block at the current position.
-
#read_data_block(uncompress: true) ⇒ Object
Read the marshalled version of a data block.
-
#read_data_block_header ⇒ Object
Read the header of one data block.
-
#read_data_block_payload ⇒ Object
Read the data payload of a data block, not parsing the header.
-
#read_next_block_header ⇒ BlockHeader?
Read the header of the next block.
-
#read_payload(count = @payload_size) ⇒ Object
Read the payload of the last block returned by #read_next_block_header.
-
#read_prologue ⇒ Object
If the IO is a file, it starts with a prologue to describe the file format.
-
#read_prologue_raw ⇒ Object
If the IO is a file, it starts with a prologue to describe the file format.
-
#read_stream_block ⇒ Object
Read one stream block.
-
#rewind ⇒ Object
Move to the beginning of the stream.
-
#seek(pos) ⇒ Object
Seek to the current raw position in the IO.
-
#skip(count) ⇒ Object
Skip that many bytes in the stream.
- #skip_payload ⇒ Object
-
#tell ⇒ Object
Current position in #io.
Constructor Details
#initialize(io, buffer_read: DEFAULT_BUFFER_READ) ⇒ BlockStream
Create a Pocolog::BlockStream object to sequentially interpret a stream of data
49 50 51 52 53 54 55 56 |
# File 'lib/pocolog/block_stream.rb', line 49 def initialize(io, buffer_read: DEFAULT_BUFFER_READ) @io = io @big_endian = nil @native_endian = nil @payload_size = 0 @buffer_io = StringIO.new @buffer_read = buffer_read end |
Instance Attribute Details
#buffer_read ⇒ Integer (readonly)
The amount of bytes that should be read into the internal buffer
28 29 30 |
# File 'lib/pocolog/block_stream.rb', line 28 def buffer_read @buffer_read end |
#io ⇒ Object (readonly)
The underlying IO
23 24 25 |
# File 'lib/pocolog/block_stream.rb', line 23 def io @io end |
Class Method Details
.open(path) ⇒ BlockStream
Create a BlockStream object that acts on a given file
38 39 40 41 42 43 44 45 46 |
# File 'lib/pocolog/block_stream.rb', line 38 def self.open(path) if block_given? File.open(path) do |io| yield(new(io)) end else new(File.open(path)) end end |
.read_block_header(io, pos = nil) ⇒ BlockHeader
173 174 175 |
# File 'lib/pocolog/block_stream.rb', line 173 def self.read_block_header(io, pos = nil) BlockHeader.parse(read_block_header_raw(io, pos)) end |
.read_block_header_raw(io, pos = nil) ⇒ Object
167 168 169 170 |
# File 'lib/pocolog/block_stream.rb', line 167 def self.read_block_header_raw(io, pos = nil) io.seek(pos, IO::SEEK_SET) if pos io.read(BLOCK_HEADER_SIZE) end |
.read_stream_block(io, pos = nil) ⇒ Object
302 303 304 305 306 307 308 309 |
# File 'lib/pocolog/block_stream.rb', line 302 def self.read_stream_block(io, pos = nil) block = read_block(io, pos) if block.kind != STREAM_BLOCK raise InvalidFile, 'expected stream declaration block' end StreamBlock.parse(io.read(block.payload_size)) end |
Instance Method Details
#big_endian? ⇒ Boolean
Whether the data in the file is stored in little or big endian
31 32 33 |
# File 'lib/pocolog/block_stream.rb', line 31 def big_endian? @big_endian end |
#close ⇒ Object
Close the file
97 98 99 |
# File 'lib/pocolog/block_stream.rb', line 97 def close io.close end |
#closed? ⇒ Boolean
Whether this stream is closed
85 86 87 |
# File 'lib/pocolog/block_stream.rb', line 85 def closed? io.closed? end |
#flush ⇒ Object
Flush buffers to the underlying backing store
90 91 92 |
# File 'lib/pocolog/block_stream.rb', line 90 def flush io.flush end |
#path ⇒ String
The IO path, if the backing IO is a file
78 79 80 |
# File 'lib/pocolog/block_stream.rb', line 78 def path io.path end |
#read(size) ⇒ Object
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Read bytes
118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 |
# File 'lib/pocolog/block_stream.rb', line 118 def read(size) remaining = if (data = @buffer_io.read(size)) (size - data.size) else size end return data if remaining == 0 @buffer_io = StringIO.new(io.read([buffer_read, remaining].max) || '') if (buffer_data = @buffer_io.read(remaining)) (data || '') + buffer_data else data end end |
#read_block_header_raw ⇒ Object
Read the bytes from the block header at the current position
Unlike #read_next_block_header, it does not skip remaining payload bytes, and does not prepare the stream to
181 182 183 184 185 186 187 188 189 190 191 |
# File 'lib/pocolog/block_stream.rb', line 181 def read_block_header_raw return unless (header = read(BLOCK_HEADER_SIZE)) if header.size != BLOCK_HEADER_SIZE raise NotEnoughData, "truncated block header (got #{header.size} bytes, "\ "expected #{BLOCK_HEADER_SIZE})" end header end |
#read_block_raw ⇒ (BlockHeader, String, String)
Read the bytes from the block at the current position
209 210 211 212 213 214 215 216 217 218 219 220 |
# File 'lib/pocolog/block_stream.rb', line 209 def read_block_raw header_raw = read_block_header_raw header = BlockHeader.parse(header_raw) payload = read(header.payload_size) if !payload || payload.size != header.payload_size raise NotEnoughData, "expected to read #{header.payload_size} payload bytes but got "\ "#{payload ? payload.size : 'EOF'}" end [header, header_raw, payload] end |
#read_data_block(uncompress: true) ⇒ Object
Read the marshalled version of a data block
It splits the block into its header and payload part, and optionally uncompresses the data sample
420 421 422 423 424 425 426 427 428 429 |
# File 'lib/pocolog/block_stream.rb', line 420 def read_data_block(uncompress: true) raw_header = read_payload(Format::Current::DATA_BLOCK_HEADER_SIZE) raw_data = read_payload compressed = raw_header[-1, 1].unpack('C').first if uncompress && (compressed != 0) # Payload is compressed raw_data = Zlib::Inflate.inflate(raw_data) end [raw_header, raw_data] end |
#read_data_block_header ⇒ Object
Read the header of one data block
The IO is assumed to be positioned at the beginning of the block’s payload
410 411 412 413 414 |
# File 'lib/pocolog/block_stream.rb', line 410 def read_data_block_header DataBlockHeader.parse( read_payload(Format::Current::DATA_BLOCK_HEADER_SIZE) ) end |
#read_data_block_payload ⇒ Object
Read the data payload of a data block, not parsing the header
The IO is assumed to be positioned just after the block header (i.e. after read_next_block_header)
435 436 437 438 439 440 441 442 443 444 |
# File 'lib/pocolog/block_stream.rb', line 435 def read_data_block_payload skip(Format::Current::DATA_BLOCK_HEADER_SIZE - 1) compressed = read_payload(1).unpack('C').first data = read_payload if compressed != 0 # Payload is compressed data = Zlib::Inflate.inflate(data) end data end |
#read_next_block_header ⇒ BlockHeader?
Read the header of the next block
196 197 198 199 200 201 202 203 204 |
# File 'lib/pocolog/block_stream.rb', line 196 def read_next_block_header skip(@payload_size) if @payload_size != 0 return unless (header = read_block_header_raw) block = BlockHeader.parse(header) @payload_size = block.payload_size block end |
#read_payload(count = @payload_size) ⇒ Object
Read the payload of the last block returned by #read_next_block_header
320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 |
# File 'lib/pocolog/block_stream.rb', line 320 def read_payload(count = @payload_size) if count > @payload_size raise ArgumentError, "expected read count #{count} greater than remaining "\ "payload size #{@payload_size}" end result = read(count) if !result || result.size != count raise NotEnoughData, "expected to read #{count} bytes but got "\ "#{result ? result.size : 'EOF'}" end @payload_size -= count result end |
#read_prologue ⇒ Object
If the IO is a file, it starts with a prologue to describe the file format
Raises MissingPrologue if no prologue is found, or ObsoleteVersion if the file format is not up-to-date (in which case one has to run pocolog –to-new-format).
152 153 154 155 156 157 158 |
# File 'lib/pocolog/block_stream.rb', line 152 def read_prologue # :nodoc: big_endian = Format::Current.read_prologue(io) @format_version = Format::Current::VERSION @big_endian = big_endian @native_endian = ((big_endian != 0) ^ Pocolog.big_endian?) @payload_size = 0 end |
#read_prologue_raw ⇒ Object
If the IO is a file, it starts with a prologue to describe the file format
Raises MissingPrologue if no prologue is found, or ObsoleteVersion if the file format is not up-to-date (in which case one has to run pocolog –to-new-format).
142 143 144 |
# File 'lib/pocolog/block_stream.rb', line 142 def read_prologue_raw # :nodoc: Format::Current.read_prologue_raw(io) end |
#read_stream_block ⇒ Object
Read one stream block
The IO is assumed to be positioned at the stream definition’s block’s payload
314 315 316 |
# File 'lib/pocolog/block_stream.rb', line 314 def read_stream_block StreamBlock.parse(read_payload) end |
#rewind ⇒ Object
Move to the beginning of the stream
102 103 104 |
# File 'lib/pocolog/block_stream.rb', line 102 def rewind seek(0) end |
#seek(pos) ⇒ Object
Seek to the current raw position in the IO
The new position is assumed to be at the start of a block
109 110 111 112 113 |
# File 'lib/pocolog/block_stream.rb', line 109 def seek(pos) io.seek(pos) @buffer_io = StringIO.new @payload_size = 0 end |
#skip(count) ⇒ Object
Skip that many bytes in the stream
64 65 66 67 68 69 70 71 72 73 |
# File 'lib/pocolog/block_stream.rb', line 64 def skip(count) buffer_remaining = (@buffer_io.size - @buffer_io.tell) if buffer_remaining < count @buffer_io.seek(buffer_remaining, IO::SEEK_CUR) io.seek(count - buffer_remaining, IO::SEEK_CUR) else @buffer_io.seek(count, IO::SEEK_CUR) end @payload_size -= count end |
#skip_payload ⇒ Object
338 339 340 |
# File 'lib/pocolog/block_stream.rb', line 338 def skip_payload skip(@payload_size) end |
#tell ⇒ Object
Current position in #io
59 60 61 |
# File 'lib/pocolog/block_stream.rb', line 59 def tell io.tell - (@buffer_io.size - @buffer_io.tell) end |