Class: Roby::Log::RemoteStream

Inherits:
DataStream show all
Defined in:
lib/roby/log/server.rb

Overview

This class manages a data stream which is present remotely. Data is sent as-is over the network from a Server object to a Client object.

Instance Attribute Summary collapse

Attributes inherited from DataStream

#decoders, #id, #name, #type

Instance Method Summary collapse

Methods inherited from DataStream

#==, #advance, #clear, #clear_integrated, #close, #decoder, #display, #displayed?, #eql?, #hash, #open, open, #read_all, #read_and_decode, #reinit?, #to_s

Constructor Details

#initialize(stream_model, id, name, type) ⇒ RemoteStream

Returns a new instance of RemoteStream.



311
312
313
314
315
316
317
318
319
320
321
# File 'lib/roby/log/server.rb', line 311

def initialize(stream_model, id, name, type)
    super(name, type)
    @id = id
    @stream_model = stream_model

    @data_file = Tempfile.new("remote_stream_#{name}_#{type}".gsub("/", "_"))
    @data_file.sync = true

    @mutex = Mutex.new
    @pending_samples = Array.new
end

Instance Attribute Details

#current_timeObject (readonly)

Returns the value of attribute current_time.



375
376
377
# File 'lib/roby/log/server.rb', line 375

def current_time
  @current_time
end

#data_fileObject (readonly)

The data file in which we save the data received so far



325
326
327
# File 'lib/roby/log/server.rb', line 325

def data_file
  @data_file
end

#stream_modelObject (readonly)

The DataStream class of the remote stream. This is used for decoding



328
329
330
# File 'lib/roby/log/server.rb', line 328

def stream_model
  @stream_model
end

Instance Method Details

#added_decoder(dec) ⇒ Object



330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
# File 'lib/roby/log/server.rb', line 330

def added_decoder(dec)
    synchronize do
 Server.info "#{self} initializing #{dec}"
 if data_file.stat.size == 0
      return
 end

 data_file.rewind
 chunk_length = data_file.read(4).unpack("N").first
 chunk = data_file.read(chunk_length)
 init(chunk) do |sample|
      dec.process(sample)
 end

 while !data_file.eof?
      chunk_length = data_file.read(4).unpack("N").first
      chunk = data_file.read(chunk_length)
      dec.process(decode(chunk))
 end

 display
    end
end

#decode(data) ⇒ Object



410
411
412
# File 'lib/roby/log/server.rb', line 410

def decode(data)
    stream_model.decode(data)
end

#has_sample?Boolean

Returns:

  • (Boolean)


389
390
391
392
393
# File 'lib/roby/log/server.rb', line 389

def has_sample?
    synchronize do
 !@pending_samples.empty? 
    end
end

#init(data, &block) ⇒ Object



404
405
406
407
408
409
# File 'lib/roby/log/server.rb', line 404

def init(data, &block)
    Server.info "#{self} initializing with #{data.size} bytes of data"
    data_file.rewind
    data_file << [data.size].pack("N") << data
    stream_model.init(data, &block)
end

#next_timeObject



377
378
379
380
381
382
383
# File 'lib/roby/log/server.rb', line 377

def next_time
    synchronize do
 if has_sample?
      @pending_samples.first
 end
    end
end

#push(time, data) ⇒ Object

Called when new data is available



363
364
365
366
367
368
369
370
371
372
373
# File 'lib/roby/log/server.rb', line 363

def push(time, data)
    Server.info "#{self} got #{data.size} bytes of data at #{time.to_hms}"
    synchronize do
 @range[0] ||= time
 @range[1] = time
 @current_time ||= time

 @pending_samples.unshift [time, data]
 data_file << [data.size].pack("N") << data
    end
end

#rangeObject



385
386
387
# File 'lib/roby/log/server.rb', line 385

def range
    synchronize { super }
end

#readObject



395
396
397
398
399
400
401
402
# File 'lib/roby/log/server.rb', line 395

def read
    if reinit?
 reinit!
    end

    @current_time, sample = @pending_samples.pop
    sample
end

#reinit!Object



354
355
356
357
358
359
360
# File 'lib/roby/log/server.rb', line 354

def reinit!
    data_file.truncate(0)
    @pending_samples.clear
    @current_time = nil

    super
end

#synchronizeObject



322
# File 'lib/roby/log/server.rb', line 322

def synchronize; @mutex.synchronize { yield } end