Module: Pocolog::Format::V2

Defined in:
lib/pocolog/format/v2.rb

Overview

Implementation of the V2 log format, including the corresponding index file

Defined Under Namespace

Classes: IndexStreamInfo

Constant Summary collapse

MAGIC =

The magic code present at the beginning of each pocolog file

"POCOSIM"
VERSION =

Format version ID. Increment this when the file format changes in a non-backward-compatible way

2
PROLOGUE_SIZE =

The size of the file’s prologue

MAGIC.size + 9
INDEX_MAGIC =

The magic code at the beginning of a pocolog index

"POCOSIM_INDEX"
INDEX_VERSION =

The current index version. Unlike with the format version, a changing index version will only cause rebuilding the index

(i.e. this can change without changing the overall format version)

3
INDEX_PROLOGUE_SIZE =

Size of the index prologue

INDEX_MAGIC.size + 20
INDEX_STREAM_COUNT_POS =

Position of the stream count field

INDEX_PROLOGUE_SIZE
INDEX_STREAM_COUNT_SIZE =

Size of the stream count field

8
INDEX_STREAM_DESCRIPTION_SIZE =

Size of a stream description in the index

8 * 8
INDEX_STREAM_ENTRY_SIZE =

Size of an entry in the index table

8 * 2
BLOCK_HEADER_SIZE =

The size of the generic block header

8
TIME_SIZE =

The size of a time in a block header

8
DATA_BLOCK_HEADER_SIZE =

The size of a data header, excluding the generic block header

TIME_SIZE * 2 + 5
STREAM_BLOCK_DECLARATION_HEADER_SIZE_MIN =

The size of a stream block declaration header

Stream declarations contain variable-length strings, we can only have a min

9

Class Method Summary collapse

Class Method Details

.index_file_valid?(index_path, file_path) ⇒ Boolean

Tests whether the index whose path is given is valid for the given log file

Returns:

  • (Boolean)


287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
# File 'lib/pocolog/format/v2.rb', line 287

def self.index_file_valid?(index_path, file_path)
    stat = file_path.stat
    begin
        File.open(index_path) do |index_io|
            streams_info = read_index_minimal_info(
                index_io, expected_file_size: stat.size
            )
            index_file_validate_size(index_io, streams_info)
        end
        true
    rescue Errno::ENOENT
        false
    end
rescue InvalidIndex
    false
end

.index_file_validate_size(index_io, streams_info) ⇒ Object

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Validate that an index’ file size match the value expected from its stream info section

Parameters:

Raises:



311
312
313
314
315
316
317
318
319
320
321
# File 'lib/pocolog/format/v2.rb', line 311

def self.index_file_validate_size(index_io, streams_info)
    index_end_pos = streams_info.map do |s|
        s.stream_size * INDEX_STREAM_ENTRY_SIZE + s.index_pos
    end
    expected_file_size = index_end_pos.max
    return if index_io.size == expected_file_size

    raise InvalidIndex,
          "index file should be of size #{expected_file_size} "\
          "but is of size #{index_io.size}"
end

.index_stream_info(stream_info, index_data_pos) ⇒ Object

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Helper method that prepares index contents for a given stream



453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
# File 'lib/pocolog/format/v2.rb', line 453

def self.index_stream_info(stream_info, index_data_pos)
    interval_rt = stream_info.interval_rt
    interval_lg = stream_info.interval_lg
    base_time   = stream_info.index.base_time
    IndexStreamInfo.new(
        declaration_pos: stream_info.declaration_blocks.first,
        index_pos: index_data_pos,
        base_time: base_time || 0,
        stream_size: stream_info.size,
        rt_min: interval_rt[0] || 0,
        rt_max: interval_rt[1] || 0,
        lg_min: interval_lg[0] || 0,
        lg_max: interval_lg[1] || 0
    )
end

.read_index(index_io, expected_file_size: nil, expected_mtime: nil) ⇒ Array<StreamInfo>

Read the information contained in a file index

Returns:

  • (Array<StreamInfo>)

    the information contained in the index file



182
183
184
185
186
187
188
189
# File 'lib/pocolog/format/v2.rb', line 182

def self.read_index(index_io, expected_file_size: nil, expected_mtime: nil)
    index_stream_info = read_index_minimal_info(
        index_io,
        expected_file_size: expected_file_size,
        expected_mtime: expected_mtime
    )
    index_stream_info.map { read_stream_info(index_io, _1) }
end

.read_index_header(index_io, expected_file_size: nil, expected_mtime: nil) ⇒ Integer

Read and optionally validate the header information from an index file

Returns:

  • (Integer)

    the amount of streams in the index

See Also:



342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
# File 'lib/pocolog/format/v2.rb', line 342

def self.read_index_header(
    index_io, expected_file_size: nil, expected_mtime: nil
)
    read_index_prologue(index_io,
                        validate_version: true,
                        expected_mtime: expected_mtime,
                        expected_file_size: expected_file_size)

    stream_count_data = index_io.read(8)

    if stream_count_data.size < 8
        raise InvalidIndex, "index file too small"
    end
    stream_count_data.unpack1("Q>")
end

.read_index_minimal_info(index_io, expected_file_size: nil, expected_mtime: nil) ⇒ Array<IndexStreamInfo>

Read the stream definitions from an index, but no the index data

Returns:



326
327
328
329
330
331
332
333
334
335
336
# File 'lib/pocolog/format/v2.rb', line 326

def self.read_index_minimal_info(
    index_io, expected_file_size: nil, expected_mtime: nil
)
    stream_count = read_index_header(
        index_io,
        expected_mtime: expected_mtime,
        expected_file_size: expected_file_size
    )

    read_index_stream_info(index_io, stream_count)
end

.read_index_prologue(index_io, validate_version: true, expected_mtime: nil, expected_file_size: nil) ⇒ Integer

Read the prologue of an index file

Parameters:

  • validate_version (Boolean) (defaults to: true)

    if true, the method will raise if the file version does not match INDEX_VERSION

Returns:

  • (Integer)

    the index file version



118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
# File 'lib/pocolog/format/v2.rb', line 118

def self.read_index_prologue(
    index_io, validate_version: true,
    expected_mtime: nil, expected_file_size: nil
)
    if index_io.size < INDEX_PROLOGUE_SIZE
        raise InvalidIndex, "index file too small to contain a valid info"
    end

    header = index_io.read(INDEX_MAGIC.size + 4)
    magic = header[0, INDEX_MAGIC.size]
    if magic != INDEX_MAGIC
        message =
            if magic
                "wrong index magic in #{index_io.path}, "\
                "probably an old index"
            else
                "#{index_io.path} is empty"
            end

        raise MissingIndexPrologue, message
    end

    index_version = Integer(header[INDEX_MAGIC.size, 4].unpack1("L>"))
    if validate_version
        if index_version < INDEX_VERSION
            raise ObsoleteIndexVersion,
                  "old format #{index_version}, "\
                  "current format is #{INDEX_VERSION}"
        elsif index_version > INDEX_VERSION
            raise InvalidIndex,
                  "old format #{index_version}, "\
                  "current format is #{INDEX_VERSION}"
        end
    end

    index_size, index_mtime = index_io.read(16).unpack("Q>Q>")
    if expected_file_size && expected_file_size != index_size
        raise InvalidIndex,
              "file size in index (#{index_size}) and actual file "\
              "size (#{expected_file_size}) mismatch"
    end
    if expected_mtime
        expected_mtime_i = StreamIndex.time_to_internal(expected_mtime, 0)
        if expected_mtime_i != index_mtime
            raise InvalidIndex,
                  "mtime in index (#{index_mtime}) and actual mtime "\
                  "(#{expected_mtime_i}) mismatch"
        end
    end
    [index_version, index_size,
     StreamIndex.time_from_internal(index_mtime, 0)]
end

.read_index_single_stream_info(index_io) ⇒ IndexStreamInfo

Read from IO the index stream info for a single stream

Returns:



372
373
374
375
376
377
378
379
380
# File 'lib/pocolog/format/v2.rb', line 372

def self.read_index_single_stream_info(index_io)
    data = index_io.read(INDEX_STREAM_DESCRIPTION_SIZE)
    if !data || data.size < INDEX_STREAM_DESCRIPTION_SIZE
        raise InvalidIndex,
              "not enough data to read stream description in index"
    end

    IndexStreamInfo.unmarshal(data)
end

.read_index_stream_info(index_io, stream_count) ⇒ Array<IndexStreamInfo>

Read basic stream information from an index file

Parameters:

  • index_io (IO)

    the index IO

Returns:

  • (Array<IndexStreamInfo>)

    the stream-related information contained in the index file



363
364
365
366
367
# File 'lib/pocolog/format/v2.rb', line 363

def self.read_index_stream_info(index_io, stream_count)
    stream_count.times.map do
        read_index_single_stream_info(index_io)
    end
end

.read_minimal_info(index_io, file_io, validate: true) ⇒ Array<(BlockStream::StreamBlock,IndexStreamInfo)>

Read the full stream definition as well as index data for a string without reading any actual data (no index data and no stream data)



214
215
216
217
218
219
220
221
222
223
224
225
226
227
# File 'lib/pocolog/format/v2.rb', line 214

def self.read_minimal_info(index_io, file_io, validate: true)
    index_stream_info = read_index_minimal_info(
        index_io,
        expected_file_size: (file_io.size if validate)
    )

    index_stream_info.map do |info|
        file_io.seek(info.declaration_pos)
        block_stream = BlockStream.new(file_io)
        block_stream.read_next_block_header
        stream_block = block_stream.read_stream_block
        [stream_block, info]
    end
end

.read_prologue(io, validate_version: true) ⇒ (Integer,Boolean)

Read a file’s prologue

Parameters:

  • io (IO)

    the file from which to read the prologue

  • validate_version (Boolean) (defaults to: true)

    if true, the method will raise if the file version does not match INDEX_VERSION

Returns:

  • ((Integer,Boolean))

    the file format version and a flag that tells whether the file’s data is encoded as big or little endian



66
67
68
69
70
71
72
73
74
75
76
77
78
# File 'lib/pocolog/format/v2.rb', line 66

def self.read_prologue(io, validate_version: true)
    header = read_prologue_raw(io)
    magic = header[0, MAGIC.size]
    if magic != MAGIC
        raise MissingPrologue,
              "#{io.path} is not a pocolog log file. "\
              "Got #{magic} at #{io.tell}, but was expecting #{MAGIC}"
    end

    format_version, big_endian = header[MAGIC.size, 9].unpack("xVV")
    validate_version(format_version) if validate_version
    [format_version, big_endian]
end

.read_prologue_raw(io) ⇒ String

Read the raw bytes from the prologue and return them

Returns:

  • (String)

Raises:

  • MissingPrologue



50
51
52
53
54
55
56
57
# File 'lib/pocolog/format/v2.rb', line 50

def self.read_prologue_raw(io)
    header = io.read(PROLOGUE_SIZE)
    if !header || (header.size < PROLOGUE_SIZE)
        raise MissingPrologue, "#{io.path} too small"
    end

    header
end

.read_stream_info(index_io, info) ⇒ Object

Read StreamInfo from the index IO based on the corresponding IndexStreamInfo

Parameters:



196
197
198
199
200
201
202
203
204
205
206
207
208
# File 'lib/pocolog/format/v2.rb', line 196

def self.read_stream_info(index_io, info)
    index_size = info.stream_size * INDEX_STREAM_ENTRY_SIZE
    index_data = index_io.pread(index_size, info.index_pos)
    if index_data.size != index_size
        raise InvalidIndex, "index file seem truncated"
    end

    index_data = index_data.unpack("Q>*")
    StreamInfo.from_raw_data(
        info.declaration_pos, info.interval_rt, info.base_time,
        index_data
    )
end

.rebuild_index_file(io, index_path) ⇒ Array<StreamInfo,nil>

Rebuild a pocolog file’s index and saves it to file

Parameters:

  • io (File)

    the pocolog file IO

  • index_path (String)

    the path into which the index should be saved

Returns:



388
389
390
391
392
393
394
395
396
397
# File 'lib/pocolog/format/v2.rb', line 388

def self.rebuild_index_file(io, index_path)
    block_stream = BlockStream.new(io)
    block_stream.read_prologue
    stream_info = Pocolog.file_index_builder(block_stream)
    FileUtils.mkdir_p(File.dirname(index_path))
    File.open(index_path, "w") do |index_io|
        write_index(index_io, io, stream_info)
    end
    stream_info
end

.valid_file?(file) ⇒ Boolean

Returns:

  • (Boolean)


98
99
100
101
102
103
104
105
# File 'lib/pocolog/format/v2.rb', line 98

def self.valid_file?(file)
    File.open(file) do |io|
        read_prologue(io)
        true
    end
rescue InvalidFile
    false
end

.validate_version(version) ⇒ Object

Verify that the given version is compatible with this format version

Raises:

  • ObsoleteVersion if the version is older than VERSION and cannot be loaded by this code

  • InvalidFile if the version is newer than VERSION



85
86
87
88
89
90
91
92
93
94
95
96
# File 'lib/pocolog/format/v2.rb', line 85

def self.validate_version(version)
    if version < VERSION
        raise ObsoleteVersion,
              "old format #{version}, current format "\
              "is #{VERSION}. Convert it using the "\
              "--to-new-format of pocolog"
    elsif version > VERSION
        raise InvalidFile,
              "this file is in v#{version} which is "\
              "newer that the one we know #{VERSION}. Update pocolog"
    end
end

.write_index(index_io, file_io, streams, version: INDEX_VERSION) ⇒ Object

Write an index file for a given file

Parameters:

  • file_io (File)

    the file that is being indexed. It cannot be a IOSequence

  • index_io (File)

    the file into which the index should be written

  • streams (Array<StreamInfo>)

    the stream information that should be stored



407
408
409
410
411
412
413
414
415
416
417
# File 'lib/pocolog/format/v2.rb', line 407

def self.write_index(index_io, file_io, streams, version: INDEX_VERSION)
    if index_io.path == file_io.path
        raise ArgumentError, "attempting to overwrite the file by its index"
    end

    size = file_io.stat.size
    mtime = file_io.stat.mtime
    write_index_header(index_io, size, mtime, streams.size, version: version)
    write_index_stream_info(index_io, streams)
    write_index_stream_data(index_io, streams)
end

.write_index_header(index_io, size, mtime, stream_count, version: INDEX_VERSION) ⇒ Object

Write index file information before the actual index data



420
421
422
423
424
425
# File 'lib/pocolog/format/v2.rb', line 420

def self.write_index_header(
    index_io, size, mtime, stream_count, version: INDEX_VERSION
)
    write_index_prologue(index_io, size, mtime, version: version)
    index_io.write([stream_count].pack("Q>"))
end

.write_index_prologue(index_io, size, mtime, version: INDEX_VERSION) ⇒ Object

Write a prologue on an index file



172
173
174
175
176
# File 'lib/pocolog/format/v2.rb', line 172

def self.write_index_prologue(index_io, size, mtime, version: INDEX_VERSION)
    index_io.write(INDEX_MAGIC)
    data = [version, size, StreamIndex.time_to_internal(mtime, 0)]
    index_io.write(data.pack("L>Q>Q>"))
end

.write_index_single_stream_info(index_io, stream_info, index_data_pos) ⇒ Object



437
438
439
440
# File 'lib/pocolog/format/v2.rb', line 437

def self.write_index_single_stream_info(index_io, stream_info, index_data_pos)
    index_stream_info = index_stream_info(stream_info, index_data_pos)
    index_io.write(index_stream_info.marshal)
end

.write_index_stream_data(index_io, streams) ⇒ Object

Write the stream data part of a file index for all given streams



443
444
445
446
447
448
# File 'lib/pocolog/format/v2.rb', line 443

def self.write_index_stream_data(index_io, streams)
    streams.each do |stream_info|
        index_map = stream_info.index.index_map
        index_io.write(index_map.pack("Q>*"))
    end
end

.write_index_stream_info(index_io, streams) ⇒ Object

Write the stream info part of a file index for all given streams



428
429
430
431
432
433
434
435
# File 'lib/pocolog/format/v2.rb', line 428

def self.write_index_stream_info(index_io, streams)
    index_data_pos = INDEX_STREAM_DESCRIPTION_SIZE * streams.size +
                     index_io.tell
    streams.each do |stream_info|
        write_index_single_stream_info(index_io, stream_info, index_data_pos)
        index_data_pos += stream_info.index.index_map.size * 8
    end
end

.write_prologue(io, big_endian = Pocolog.big_endian?) ⇒ Object

Write a v2 file prologue



108
109
110
111
# File 'lib/pocolog/format/v2.rb', line 108

def self.write_prologue(io, big_endian = Pocolog.big_endian?)
    io.write(MAGIC)
    io.write(*[VERSION, big_endian ? 1 : 0].pack("xVV"))
end