Class: Pocolog::DataStream
- Inherits:
-
Object
- Object
- Pocolog::DataStream
- Defined in:
- lib/pocolog/data_stream.rb
Overview
Interface for reading a stream in a Pocolog::Logfiles
Instance Attribute Summary collapse
-
#index ⇒ Object
readonly
Returns the value of attribute index.
-
#info ⇒ Object
readonly
The StreamInfo structure for that stream.
-
#logfile ⇒ Object
readonly
Returns the value of attribute logfile.
-
#metadata ⇒ Object
The stream associated metadata.
-
#name ⇒ Object
readonly
Returns the value of attribute name.
-
#sample_index ⇒ Object
readonly
The index in the stream of the last read sample.
-
#time_getter ⇒ Object
Returns the value of attribute time_getter.
-
#type ⇒ Object
readonly
Returns the value of attribute type.
Instance Method Summary collapse
-
#[](sample_index) ⇒ Object
Returns the
sample_index
sample of this stream. -
#advance ⇒ Object
Reads the next sample in the file, and returns its header.
- #close ⇒ Object
- #closed? ⇒ Boolean
-
#copy_to(start_index = 0, end_index = size, stream, &block) ⇒ Object
call-seq: copy_to(index1,index2,stream) => true copy_to(time1,time2,stream) => true.
- #data(data_header = nil) ⇒ Object
-
#data_header ⇒ Object
The data header for the current sample.
-
#duration_lg ⇒ Float
Returns this stream’s duration in seconds.
-
#each(rewind: true) {|rt, lg, sample| ... } ⇒ Object
Enumerates the samples of this stream, converting the typelib data.
-
#each_block(rewind = true) { ... } ⇒ Object
Enumerates the blocks of this stream.
-
#empty? ⇒ Boolean
True if the size of this stream is zero.
-
#eof? ⇒ Boolean
True if we read past the last sample.
-
#first ⇒ Object
call-seq: first => [time_rt, time_lg, data].
-
#from_logical_time(time) ⇒ Object
Return a new DataStream whose all samples before the given logical time are removed.
-
#initialize(logfile, index, name, stream_type, metadata = {}, info = StreamInfo.new) ⇒ DataStream
constructor
A new instance of DataStream.
-
#interval_lg ⇒ (Time,Time), ()
Return the logical time of the first and last samples in this stream.
-
#interval_rt ⇒ (Time,Time), ()
Return the realtime of the first and last samples in this stream.
-
#last ⇒ Object
call-seq: last => [time_rt, time_lg, data].
-
#next ⇒ Object
call-seq: next => [time_rt, time_lg, data].
- #open ⇒ Object
-
#previous ⇒ Object
call-seq: previous => [time_rt, time_lg, data].
-
#raw_data(data_header = nil, sample = nil) ⇒ Object
Returns the decoded data sample associated with the given block header.
-
#raw_each(rewind: true) {|rt, lg, sample| ... } ⇒ Object
Enumerates the samples of this stream without converting the typelib data.
- #read_one_data_sample(position) ⇒ Object
- #read_one_raw_data_sample(position, sample = nil) ⇒ Object
-
#registry ⇒ Object
Get the Typelib::Registry object for this stream.
-
#resample_by_index(samples) ⇒ Object
Return a new DataStream with only the N-th sample.
-
#resample_by_time(period, start_time: nil) ⇒ Object
Return a new DataStream with only the N-th sample.
-
#rewind ⇒ Object
call-seq: rewind => data_header.
-
#samples(read_data = true) ⇒ Object
Returns a SampleEnumerator object for this stream.
-
#samples?(start_index, end_index) ⇒ Boolean
call-seq: samples?(pos1,pos2) => true samples?(time1,time2) => true.
-
#seek(pos, decode_data = true) ⇒ Object
Seek the stream at the given position.
-
#size ⇒ Object
The size, in samples, of data in this stream.
- #stream_index ⇒ Object
-
#sub_field(fieldname, data_header = nil) ⇒ Object
Returns the decoded subfield specified by ‘fieldname’ for the given data header.
-
#time ⇒ Object
Returns the time of the current sample.
-
#time_interval(rt = false) ⇒ Object
Get the logical time of first and last samples in this stream.
-
#to_logical_time(time) ⇒ Object
Return a new DataStream whose all samples after the given logical time are removed.
- #type_name ⇒ Object
- #typename ⇒ Object
-
#updated_datastream_from_index(stream_index) ⇒ Object
private
Re-creates a “copy” of this new datastream using an updated index.
-
#write(rt, lg, data) ⇒ Object
Write a sample in this stream, with the
rt
andlg
timestamps. -
#write_raw(rt, lg, data) ⇒ Object
Write an already marshalled sample.
Constructor Details
#initialize(logfile, index, name, stream_type, metadata = {}, info = StreamInfo.new) ⇒ DataStream
Returns a new instance of DataStream.
28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 |
# File 'lib/pocolog/data_stream.rb', line 28 def initialize(logfile, index, name, stream_type, = {}, info = StreamInfo.new) @logfile = logfile @index = Integer(index) @name = name.to_str @metadata = @info = info # if we do have a registry, then adapt it to the local machine # if needed. Right now, this is required if containers changed # size. registry = stream_type.registry resize_containers = Hash.new registry.each do |type| if type <= Typelib::ContainerType && type.size != type.natural_size resize_containers[type] = type.natural_size end end if resize_containers.empty? @type = stream_type else registry.resize(resize_containers) @type = registry.get(stream_type.name) end @data = nil @sample_index = -1 end |
Instance Attribute Details
#index ⇒ Object (readonly)
Returns the value of attribute index.
5 6 7 |
# File 'lib/pocolog/data_stream.rb', line 5 def index @index end |
#info ⇒ Object (readonly)
The StreamInfo structure for that stream
11 12 13 |
# File 'lib/pocolog/data_stream.rb', line 11 def info @info end |
#logfile ⇒ Object (readonly)
Returns the value of attribute logfile.
4 5 6 |
# File 'lib/pocolog/data_stream.rb', line 4 def logfile @logfile end |
#metadata ⇒ Object
The stream associated metadata
18 19 20 |
# File 'lib/pocolog/data_stream.rb', line 18 def @metadata end |
#name ⇒ Object (readonly)
Returns the value of attribute name.
6 7 8 |
# File 'lib/pocolog/data_stream.rb', line 6 def name @name end |
#sample_index ⇒ Object (readonly)
The index in the stream of the last read sample
It is equal to size if we are past-the-end, i.e. if one called #next until it returned nil
16 17 18 |
# File 'lib/pocolog/data_stream.rb', line 16 def sample_index @sample_index end |
#time_getter ⇒ Object
Returns the value of attribute time_getter.
172 173 174 |
# File 'lib/pocolog/data_stream.rb', line 172 def time_getter @time_getter end |
#type ⇒ Object (readonly)
Returns the value of attribute type.
8 9 10 |
# File 'lib/pocolog/data_stream.rb', line 8 def type @type end |
Instance Method Details
#[](sample_index) ⇒ Object
Returns the sample_index
sample of this stream
168 169 170 |
# File 'lib/pocolog/data_stream.rb', line 168 def [](sample_index) seek(sample_index) end |
#advance ⇒ Object
Reads the next sample in the file, and returns its header. Returns nil if the end of file has been reached. Unlike next
, it does not decodes the data payload.
394 395 396 397 398 399 400 401 402 403 404 |
# File 'lib/pocolog/data_stream.rb', line 394 def advance if sample_index < size - 1 @sample_index += 1 file_pos = stream_index.file_position_by_sample_number(@sample_index) logfile.read_one_block(file_pos) return logfile.data_header else @sample_index = size end nil end |
#close ⇒ Object
109 110 111 |
# File 'lib/pocolog/data_stream.rb', line 109 def close logfile.close end |
#closed? ⇒ Boolean
101 102 103 |
# File 'lib/pocolog/data_stream.rb', line 101 def closed? logfile.closed? end |
#copy_to(start_index = 0, end_index = size, stream, &block) ⇒ Object
call-seq:
copy_to(index1,index2,stream) => true
copy_to(time1,time2,stream) => true
copies all blocks from start_index/time to end_index/time to the given stream for each block the given code block is called. If the code block returns 1 the copy process will be canceled and the method returns false
The given interval is automatically truncated if it is too big
444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 |
# File 'lib/pocolog/data_stream.rb', line 444 def copy_to(start_index = 0, end_index = size, stream, &block) return if empty? interval = interval_lg start_index = if start_index.is_a? Time if interval.first > start_index 0 else stream_index.sample_number_by_time(start_index) end else if start_index < 0 0 else start_index end end end_index = if end_index.is_a? Time if interval.last < end_index size else stream_index.sample_number_by_time(end_index) end else if end_index >= size size else end_index end end counter = 0 data_header = seek(start_index, false) while sample_index < end_index if block return false if block.call(counter) end data_buffer = logfile.data(data_header) stream.write_raw(data_header.rt_time, data_header.lg_time, data_buffer) counter += 1 data_header = advance end counter end |
#data(data_header = nil) ⇒ Object
306 307 308 |
# File 'lib/pocolog/data_stream.rb', line 306 def data(data_header = nil) Typelib.to_ruby(raw_data(data_header)) end |
#data_header ⇒ Object
The data header for the current sample. You can store a copy of this header to retrieve data later on with #data:
# Don't forget to duplicate !
stored_header = stream.data_header.dup
...
data = stream.data(stored_header)
232 |
# File 'lib/pocolog/data_stream.rb', line 232 def data_header; logfile.data_header end |
#duration_lg ⇒ Float
Returns this stream’s duration in seconds
216 217 218 219 220 221 222 223 |
# File 'lib/pocolog/data_stream.rb', line 216 def duration_lg if empty? 0 else interval = interval_lg interval[1] - interval[0] end end |
#each(rewind: true) {|rt, lg, sample| ... } ⇒ Object
Enumerates the samples of this stream, converting the typelib data
159 160 161 162 163 164 165 |
# File 'lib/pocolog/data_stream.rb', line 159 def each(rewind: true) return enum_for(__method__, rewind: rewind) unless block_given? raw_each(rewind: rewind) do |rt, lg, sample| yield(rt, lg, Typelib.to_ruby(sample)) end end |
#each_block(rewind = true) { ... } ⇒ Object
Enumerates the blocks of this stream
124 125 126 127 128 129 |
# File 'lib/pocolog/data_stream.rb', line 124 def each_block(rewind = true) return enum_for(__method__, rewind) unless block_given? self.rewind if rewind yield while advance end |
#empty? ⇒ Boolean
True if the size of this stream is zero
240 |
# File 'lib/pocolog/data_stream.rb', line 240 def empty?; size == 0 end |
#eof? ⇒ Boolean
True if we read past the last sample
238 |
# File 'lib/pocolog/data_stream.rb', line 238 def eof?; size == sample_index end |
#first ⇒ Object
call-seq:
first => [time_rt, time_lg, data]
Returns the first sample in the stream, or nil if the stream is empty
It differs from #rewind as it always decodes the data payload.
After a call to #first, #sample_index is 0
333 334 335 336 |
# File 'lib/pocolog/data_stream.rb', line 333 def first rewind self.next end |
#from_logical_time(time) ⇒ Object
Return a new DataStream whose all samples before the given logical time are removed
59 60 61 |
# File 'lib/pocolog/data_stream.rb', line 59 def from_logical_time(time) updated_datastream_from_index(stream_index.remove_before(time)) end |
#interval_lg ⇒ (Time,Time), ()
Return the logical time of the first and last samples in this stream
196 197 198 |
# File 'lib/pocolog/data_stream.rb', line 196 def interval_lg info.interval_lg.map { |t| StreamIndex.time_from_internal(t, 0) } end |
#interval_rt ⇒ (Time,Time), ()
Return the realtime of the first and last samples in this stream
188 189 190 |
# File 'lib/pocolog/data_stream.rb', line 188 def interval_rt info.interval_rt.map { |t| StreamIndex.time_from_internal(t, 0) } end |
#last ⇒ Object
call-seq:
last => [time_rt, time_lg, data]
Returns the last sample in the stream, or nil if the stream is empty.
After a call to #last, #sample_index is size - 1
344 345 346 347 |
# File 'lib/pocolog/data_stream.rb', line 344 def last @sample_index = size - 2 self.next end |
#next ⇒ Object
call-seq:
next => [time_rt, time_lg, data]
Reads the next sample in the file, and returns it. It differs from advance
as it always decodes the data sample.
411 412 413 414 415 416 |
# File 'lib/pocolog/data_stream.rb', line 411 def next header = advance if header return [header.rt, header.lg, data] end end |
#open ⇒ Object
105 106 107 |
# File 'lib/pocolog/data_stream.rb', line 105 def open logfile.open end |
#previous ⇒ Object
call-seq:
previous => [time_rt, time_lg, data]
Reads the previous sample in the file, and returns it.
422 423 424 425 426 427 428 429 430 431 432 433 |
# File 'lib/pocolog/data_stream.rb', line 422 def previous if sample_index < 0 # Just rewind, never played return nil elsif sample_index == 0 # Beginning of file reached rewind return nil else seek(sample_index - 1) end end |
#raw_data(data_header = nil, sample = nil) ⇒ Object
Returns the decoded data sample associated with the given block header.
Block headers are returned by #rewind
270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 |
# File 'lib/pocolog/data_stream.rb', line 270 def raw_data(data_header = nil, sample = nil) if(@data && !data_header) then @data else data_header ||= logfile.data_header marshalled_data = logfile.data(data_header) data = sample || type.new data.from_buffer_direct(marshalled_data) if logfile.endian_swap data = data.endian_swap end data end rescue Interrupt raise rescue Exception => e raise e, "failed to unmarshal sample in block at position #{data_header.block_pos}: #{e.}", e.backtrace end |
#raw_each(rewind: true) {|rt, lg, sample| ... } ⇒ Object
Enumerates the samples of this stream without converting the typelib data
140 141 142 143 144 145 146 147 |
# File 'lib/pocolog/data_stream.rb', line 140 def raw_each(rewind: true) return enum_for(__method__, rewind: rewind) unless block_given? each_block(rewind) do data_block = data_header yield(data_block.rt, data_block.lg, raw_data(data_block)) end end |
#read_one_data_sample(position) ⇒ Object
288 289 290 |
# File 'lib/pocolog/data_stream.rb', line 288 def read_one_data_sample(position) Typelib.to_ruby(read_one_raw_data_sample(position)) end |
#read_one_raw_data_sample(position, sample = nil) ⇒ Object
292 293 294 295 296 297 298 299 300 301 302 303 304 |
# File 'lib/pocolog/data_stream.rb', line 292 def read_one_raw_data_sample(position, sample = nil) block_pos = stream_index.file_position_by_sample_number(position) marshalled_data = logfile.read_one_data_payload(block_pos) data = sample || type.new data.from_buffer_direct(marshalled_data) data = data.endian_swap if logfile.endian_swap data rescue Interrupt raise rescue StandardError => e raise e, "failed to unmarshal sample in block at position #{block_pos}: "\ "#{e.}", e.backtrace end |
#registry ⇒ Object
Get the Typelib::Registry object for this stream
243 244 245 |
# File 'lib/pocolog/data_stream.rb', line 243 def registry type.registry end |
#resample_by_index(samples) ⇒ Object
Return a new DataStream with only the N-th sample
70 71 72 |
# File 'lib/pocolog/data_stream.rb', line 70 def resample_by_index(samples) updated_datastream_from_index(stream_index.resample_by_index(samples)) end |
#resample_by_time(period, start_time: nil) ⇒ Object
Return a new DataStream with only the N-th sample
78 79 80 81 82 |
# File 'lib/pocolog/data_stream.rb', line 78 def resample_by_time(period, start_time: nil) updated_datastream_from_index( stream_index.resample_by_time(period, start_time: start_time) ) end |
#rewind ⇒ Object
call-seq:
rewind => data_header
Goes to the first sample in the stream, and returns its header. Returns nil if the stream is empty.
It differs from #first as it does not decode the data payload.
317 318 319 320 321 322 323 |
# File 'lib/pocolog/data_stream.rb', line 317 def rewind # The first sample in the file has index 0, so set sample_index to # -1 so that (@sample_index += 1) sets the index to 0 for the first # sample @sample_index = -1 nil end |
#samples(read_data = true) ⇒ Object
Returns a SampleEnumerator object for this stream
114 |
# File 'lib/pocolog/data_stream.rb', line 114 def samples(read_data = true); SampleEnumerator.new(self, read_data) end |
#samples?(start_index, end_index) ⇒ Boolean
call-seq:
samples?(pos1,pos2) => true
samples?(time1,time2) => true
returns true if stream samples lies insight the given time or position interval
494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 |
# File 'lib/pocolog/data_stream.rb', line 494 def samples?(start_index,end_index) if end_index < start_index raise ArgumentError, "end bound in sample interval smaller than start bound" elsif start_index.is_a? Time if start_index > end_index raise ArgumentError, "end bound in sample interval smaller than start bound" elsif empty? return end start_t, end_t = interval_lg start_index <= end_t && start_t <= end_index elsif start_index < 0 raise ArgumentError, "negative start index" else start_index < size end end |
#seek(pos, decode_data = true) ⇒ Object
Seek the stream at the given position
If pos
is a Time object, seeks to the last sample whose logical time is not greater than pos
If pos
is an integer, it is interpreted as an index and the stream goes to the sample that has this index.
Returns [rt, lg, data] for the current sample (if there is one), and nil otherwise
359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 |
# File 'lib/pocolog/data_stream.rb', line 359 def seek(pos, decode_data = true) if pos.kind_of?(Time) interval_lg = self.interval_lg return nil if interval_lg.empty? || interval_lg[0] > pos || interval_lg[1] < pos @sample_index = stream_index.sample_number_by_time(pos) else @sample_index = pos end file_pos = stream_index.file_position_by_sample_number(@sample_index) block_info = logfile.read_one_block(file_pos) if block_info.stream_index != self.index block_stream_name = logfile.stream_from_index(block_info.stream_index).name raise InternalError, "index returned index=#{@sample_index} and pos=#{file_pos} as "\ "position for seek(#{pos}) but it seems to be a sample in "\ "stream #{block_info.stream_index} (#{block_stream_name}} while "\ "we were expecting #{index} (#{name})" end if header = self.data_header header = header.dup if decode_data data = self.data(header) return [header.rt, header.lg, data] else header end end end |
#size ⇒ Object
The size, in samples, of data in this stream
235 |
# File 'lib/pocolog/data_stream.rb', line 235 def size; info.size end |
#stream_index ⇒ Object
97 98 99 |
# File 'lib/pocolog/data_stream.rb', line 97 def stream_index info.index end |
#sub_field(fieldname, data_header = nil) ⇒ Object
Returns the decoded subfield specified by ‘fieldname’ for the given data header. If no header is given, the current last read data header is used
250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 |
# File 'lib/pocolog/data_stream.rb', line 250 def sub_field(fieldname, data_header = nil) header = data_header || logfile.data_header if( header.compressed ) data(data_header).send(fieldname) elsif(type.is_a?(Typelib::CompoundType) and type.has_field?(fieldname)) offset = type.offset_of(fieldname) subtype = type[fieldname] rawData = logfile.sub_field(offset, subtype.size, data_header) wrappedType = subtype.wrap(rawData) rubyType = Typelib.to_ruby(wrappedType) rubyType else nil end end |
#time ⇒ Object
Returns the time of the current sample
175 176 177 178 179 180 181 182 |
# File 'lib/pocolog/data_stream.rb', line 175 def time header = logfile.data_header if !time_getter [header.rt, header.lg] else [header.rt, time_getter[data(header)]] end end |
#time_interval(rt = false) ⇒ Object
Get the logical time of first and last samples in this stream. If rt
is true, returns the interval for the wall-clock time
Returns nil if the stream is empty
204 205 206 207 208 209 210 211 |
# File 'lib/pocolog/data_stream.rb', line 204 def time_interval(rt = false) Pocolog.warn_deprecated "Pocolog::DataStream#time_interval is deprecated, use #interval_lg or #interval_rt instead" if rt interval_rt else interval_lg end end |
#to_logical_time(time) ⇒ Object
Return a new DataStream whose all samples after the given logical time are removed
65 66 67 |
# File 'lib/pocolog/data_stream.rb', line 65 def to_logical_time(time) updated_datastream_from_index(stream_index.remove_after(time)) end |
#type_name ⇒ Object
24 25 26 |
# File 'lib/pocolog/data_stream.rb', line 24 def type_name type.name end |
#typename ⇒ Object
20 21 22 |
# File 'lib/pocolog/data_stream.rb', line 20 def typename type.name end |
#updated_datastream_from_index(stream_index) ⇒ Object
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Re-creates a “copy” of this new datastream using an updated index
This allows to create a view of the original stream by modifying the index
90 91 92 93 94 95 |
# File 'lib/pocolog/data_stream.rb', line 90 def updated_datastream_from_index(stream_index) info = StreamInfo.from_raw_data( [], @info.interval_rt, stream_index.base_time, stream_index.index_map ) self.class.new(@logfile, index, @name, @type, @metadata, info) end |
#write(rt, lg, data) ⇒ Object
Write a sample in this stream, with the rt
and lg
timestamps. data
can be either a Typelib::Type object of the right type, or a String (in which case we consider that it is the raw data)
516 517 518 519 |
# File 'lib/pocolog/data_stream.rb', line 516 def write(rt, lg, data) data = Typelib.from_ruby(data, type) write_raw(rt, lg, data.to_byte_array) end |
#write_raw(rt, lg, data) ⇒ Object
Write an already marshalled sample. data
is supposed to be a typelib-marshalled value of the stream type
523 524 525 |
# File 'lib/pocolog/data_stream.rb', line 523 def write_raw(rt, lg, data) logfile.write_data_block(self, rt, lg, data) end |