Class: Pocolog::BlockStream

Inherits:
Object
  • Object
show all
Defined in:
lib/pocolog/block_stream.rb

Overview

Enumeration of blocks in a pocolog-compatible stream

Defined Under Namespace

Classes: BlockHeader, DataBlockHeader, StreamBlock

Constant Summary collapse

FORMAT_VERSION =
Format::Current::VERSION
BLOCK_HEADER_SIZE =

The size of the generic block header

Format::Current::BLOCK_HEADER_SIZE
TIME_SIZE =

The size of a time in a block header

Format::Current::TIME_SIZE
MAGIC =

Magic code at the beginning of the log file

Format::Current::MAGIC
DEFAULT_BUFFER_READ =

Read by 4kB chunks by default

4 * 1024

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(io, buffer_read: DEFAULT_BUFFER_READ) ⇒ BlockStream

Create a Pocolog::BlockStream object to sequentially interpret a stream of data



49
50
51
52
53
54
55
56
# File 'lib/pocolog/block_stream.rb', line 49

def initialize(io, buffer_read: DEFAULT_BUFFER_READ)
    @io = io
    @big_endian = nil
    @native_endian = nil
    @payload_size = 0
    @buffer_io = StringIO.new
    @buffer_read = buffer_read
end

Instance Attribute Details

#buffer_readInteger (readonly)

The amount of bytes that should be read into the internal buffer

Returns:

  • (Integer)


28
29
30
# File 'lib/pocolog/block_stream.rb', line 28

def buffer_read
  @buffer_read
end

#ioObject (readonly)

The underlying IO



23
24
25
# File 'lib/pocolog/block_stream.rb', line 23

def io
  @io
end

Class Method Details

.open(path) ⇒ BlockStream

Create a BlockStream object that acts on a given file

Returns:



38
39
40
41
42
43
44
45
46
# File 'lib/pocolog/block_stream.rb', line 38

def self.open(path)
    if block_given?
        File.open(path) do |io|
            yield(new(io))
        end
    else
        new(File.open(path))
    end
end

.read_block_header(io, pos = nil) ⇒ BlockHeader

Returns:



173
174
175
# File 'lib/pocolog/block_stream.rb', line 173

def self.read_block_header(io, pos = nil)
    BlockHeader.parse(read_block_header_raw(io, pos))
end

.read_block_header_raw(io, pos = nil) ⇒ Object



167
168
169
170
# File 'lib/pocolog/block_stream.rb', line 167

def self.read_block_header_raw(io, pos = nil)
    io.seek(pos, IO::SEEK_SET) if pos
    io.read(BLOCK_HEADER_SIZE)
end

.read_stream_block(io, pos = nil) ⇒ Object



302
303
304
305
306
307
308
309
# File 'lib/pocolog/block_stream.rb', line 302

def self.read_stream_block(io, pos = nil)
    block = read_block(io, pos)
    if block.kind != STREAM_BLOCK
        raise InvalidFile, 'expected stream declaration block'
    end

    StreamBlock.parse(io.read(block.payload_size))
end

Instance Method Details

#big_endian?Boolean

Whether the data in the file is stored in little or big endian

Returns:

  • (Boolean)


31
32
33
# File 'lib/pocolog/block_stream.rb', line 31

def big_endian?
    @big_endian
end

#closeObject

Close the file

See Also:



97
98
99
# File 'lib/pocolog/block_stream.rb', line 97

def close
    io.close
end

#closed?Boolean

Whether this stream is closed

Returns:

  • (Boolean)

See Also:



85
86
87
# File 'lib/pocolog/block_stream.rb', line 85

def closed?
    io.closed?
end

#flushObject

Flush buffers to the underlying backing store



90
91
92
# File 'lib/pocolog/block_stream.rb', line 90

def flush
    io.flush
end

#pathString

The IO path, if the backing IO is a file

Returns:

  • (String)


78
79
80
# File 'lib/pocolog/block_stream.rb', line 78

def path
    io.path
end

#read(size) ⇒ Object

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Read bytes



118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
# File 'lib/pocolog/block_stream.rb', line 118

def read(size)
    remaining =
        if (data = @buffer_io.read(size))
            (size - data.size)
        else
            size
        end

    return data if remaining == 0

    @buffer_io = StringIO.new(io.read([buffer_read, remaining].max) || '')
    if (buffer_data = @buffer_io.read(remaining))
        (data || '') + buffer_data
    else
        data
    end
end

#read_block_header_rawObject

Read the bytes from the block header at the current position

Unlike #read_next_block_header, it does not skip remaining payload bytes, and does not prepare the stream to



181
182
183
184
185
186
187
188
189
190
191
# File 'lib/pocolog/block_stream.rb', line 181

def read_block_header_raw
    return unless (header = read(BLOCK_HEADER_SIZE))

    if header.size != BLOCK_HEADER_SIZE
        raise NotEnoughData,
              "truncated block header (got #{header.size} bytes, "\
              "expected #{BLOCK_HEADER_SIZE})"
    end

    header
end

#read_block_raw(BlockHeader, String, String)

Read the bytes from the block at the current position

Returns:



209
210
211
212
213
214
215
216
217
218
219
220
# File 'lib/pocolog/block_stream.rb', line 209

def read_block_raw
    header_raw = read_block_header_raw
    header = BlockHeader.parse(header_raw)
    payload = read(header.payload_size)
    if !payload || payload.size != header.payload_size
        raise NotEnoughData,
              "expected to read #{header.payload_size} payload bytes but got "\
              "#{payload ? payload.size : 'EOF'}"
    end

    [header, header_raw, payload]
end

#read_data_block(uncompress: true) ⇒ Object

Read the marshalled version of a data block

It splits the block into its header and payload part, and optionally uncompresses the data sample



420
421
422
423
424
425
426
427
428
429
# File 'lib/pocolog/block_stream.rb', line 420

def read_data_block(uncompress: true)
    raw_header = read_payload(Format::Current::DATA_BLOCK_HEADER_SIZE)
    raw_data   = read_payload
    compressed = raw_header[-1, 1].unpack('C').first
    if uncompress && (compressed != 0)
        # Payload is compressed
        raw_data = Zlib::Inflate.inflate(raw_data)
    end
    [raw_header, raw_data]
end

#read_data_block_headerObject

Read the header of one data block

The IO is assumed to be positioned at the beginning of the block’s payload



410
411
412
413
414
# File 'lib/pocolog/block_stream.rb', line 410

def read_data_block_header
    DataBlockHeader.parse(
        read_payload(Format::Current::DATA_BLOCK_HEADER_SIZE)
    )
end

#read_data_block_payloadObject

Read the data payload of a data block, not parsing the header

The IO is assumed to be positioned just after the block header (i.e. after read_next_block_header)



435
436
437
438
439
440
441
442
443
444
# File 'lib/pocolog/block_stream.rb', line 435

def read_data_block_payload
    skip(Format::Current::DATA_BLOCK_HEADER_SIZE - 1)
    compressed = read_payload(1).unpack('C').first
    data = read_payload
    if compressed != 0
        # Payload is compressed
        data = Zlib::Inflate.inflate(data)
    end
    data
end

#read_next_block_headerBlockHeader?

Read the header of the next block

Returns:



196
197
198
199
200
201
202
203
204
# File 'lib/pocolog/block_stream.rb', line 196

def read_next_block_header
    skip(@payload_size) if @payload_size != 0

    return unless (header = read_block_header_raw)

    block = BlockHeader.parse(header)
    @payload_size = block.payload_size
    block
end

#read_payload(count = @payload_size) ⇒ Object

Read the payload of the last block returned by #read_next_block_header



320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
# File 'lib/pocolog/block_stream.rb', line 320

def read_payload(count = @payload_size)
    if count > @payload_size
        raise ArgumentError,
              "expected read count #{count} greater than remaining "\
              "payload size #{@payload_size}"
    end

    result = read(count)
    if !result || result.size != count
        raise NotEnoughData,
              "expected to read #{count} bytes but got "\
              "#{result ? result.size : 'EOF'}"
    end

    @payload_size -= count
    result
end

#read_prologueObject

If the IO is a file, it starts with a prologue to describe the file format

Raises MissingPrologue if no prologue is found, or ObsoleteVersion if the file format is not up-to-date (in which case one has to run pocolog –to-new-format).



152
153
154
155
156
157
158
# File 'lib/pocolog/block_stream.rb', line 152

def read_prologue # :nodoc:
    big_endian = Format::Current.read_prologue(io)
    @format_version = Format::Current::VERSION
    @big_endian = big_endian
    @native_endian = ((big_endian != 0) ^ Pocolog.big_endian?)
    @payload_size = 0
end

#read_prologue_rawObject

If the IO is a file, it starts with a prologue to describe the file format

Raises MissingPrologue if no prologue is found, or ObsoleteVersion if the file format is not up-to-date (in which case one has to run pocolog –to-new-format).



142
143
144
# File 'lib/pocolog/block_stream.rb', line 142

def read_prologue_raw # :nodoc:
    Format::Current.read_prologue_raw(io)
end

#read_stream_blockObject

Read one stream block

The IO is assumed to be positioned at the stream definition’s block’s payload



314
315
316
# File 'lib/pocolog/block_stream.rb', line 314

def read_stream_block
    StreamBlock.parse(read_payload)
end

#rewindObject

Move to the beginning of the stream



102
103
104
# File 'lib/pocolog/block_stream.rb', line 102

def rewind
    seek(0)
end

#seek(pos) ⇒ Object

Seek to the current raw position in the IO

The new position is assumed to be at the start of a block



109
110
111
112
113
# File 'lib/pocolog/block_stream.rb', line 109

def seek(pos)
    io.seek(pos)
    @buffer_io = StringIO.new
    @payload_size = 0
end

#skip(count) ⇒ Object

Skip that many bytes in the stream



64
65
66
67
68
69
70
71
72
73
# File 'lib/pocolog/block_stream.rb', line 64

def skip(count)
    buffer_remaining = (@buffer_io.size - @buffer_io.tell)
    if buffer_remaining < count
        @buffer_io.seek(buffer_remaining, IO::SEEK_CUR)
        io.seek(count - buffer_remaining, IO::SEEK_CUR)
    else
        @buffer_io.seek(count, IO::SEEK_CUR)
    end
    @payload_size -= count
end

#skip_payloadObject



338
339
340
# File 'lib/pocolog/block_stream.rb', line 338

def skip_payload
    skip(@payload_size)
end

#tellObject

Current position in #io



59
60
61
# File 'lib/pocolog/block_stream.rb', line 59

def tell
    io.tell - (@buffer_io.size - @buffer_io.tell)
end