Class: Roby::Log::RemoteStream
Overview
This class manages a data stream which is present remotely. Data is sent as-is over the network from a Server object to a Client object.
Instance Attribute Summary collapse
Attributes inherited from DataStream
#decoders, #id, #name, #type
Instance Method Summary
collapse
Methods inherited from DataStream
#==, #advance, #clear, #clear_integrated, #close, #decoder, #display, #displayed?, #eql?, #hash, #open, open, #read_all, #read_and_decode, #reinit?, #to_s
Constructor Details
#initialize(stream_model, id, name, type) ⇒ RemoteStream
Returns a new instance of RemoteStream.
311
312
313
314
315
316
317
318
319
320
321
|
# File 'lib/roby/log/server.rb', line 311
def initialize(stream_model, id, name, type)
super(name, type)
@id = id
@stream_model = stream_model
@data_file = Tempfile.new("remote_stream_#{name}_#{type}".gsub("/", "_"))
@data_file.sync = true
@mutex = Mutex.new
@pending_samples = Array.new
end
|
Instance Attribute Details
#current_time ⇒ Object
Returns the value of attribute current_time.
375
376
377
|
# File 'lib/roby/log/server.rb', line 375
def current_time
@current_time
end
|
#data_file ⇒ Object
The data file in which we save the data received so far
325
326
327
|
# File 'lib/roby/log/server.rb', line 325
def data_file
@data_file
end
|
#stream_model ⇒ Object
The DataStream class of the remote stream. This is used for decoding
328
329
330
|
# File 'lib/roby/log/server.rb', line 328
def stream_model
@stream_model
end
|
Instance Method Details
#added_decoder(dec) ⇒ Object
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
|
# File 'lib/roby/log/server.rb', line 330
def added_decoder(dec)
synchronize do
Server.info "#{self} initializing #{dec}"
if data_file.stat.size == 0
return
end
data_file.rewind
chunk_length = data_file.read(4).unpack("N").first
chunk = data_file.read(chunk_length)
init(chunk) do |sample|
dec.process(sample)
end
while !data_file.eof?
chunk_length = data_file.read(4).unpack("N").first
chunk = data_file.read(chunk_length)
dec.process(decode(chunk))
end
display
end
end
|
#decode(data) ⇒ Object
410
411
412
|
# File 'lib/roby/log/server.rb', line 410
def decode(data)
stream_model.decode(data)
end
|
#has_sample? ⇒ Boolean
389
390
391
392
393
|
# File 'lib/roby/log/server.rb', line 389
def has_sample?
synchronize do
!@pending_samples.empty?
end
end
|
#init(data, &block) ⇒ Object
404
405
406
407
408
409
|
# File 'lib/roby/log/server.rb', line 404
def init(data, &block)
Server.info "#{self} initializing with #{data.size} bytes of data"
data_file.rewind
data_file << [data.size].pack("N") << data
stream_model.init(data, &block)
end
|
#next_time ⇒ Object
377
378
379
380
381
382
383
|
# File 'lib/roby/log/server.rb', line 377
def next_time
synchronize do
if has_sample?
@pending_samples.first
end
end
end
|
#push(time, data) ⇒ Object
Called when new data is available
363
364
365
366
367
368
369
370
371
372
373
|
# File 'lib/roby/log/server.rb', line 363
def push(time, data)
Server.info "#{self} got #{data.size} bytes of data at #{time.to_hms}"
synchronize do
@range[0] ||= time
@range[1] = time
@current_time ||= time
@pending_samples.unshift [time, data]
data_file << [data.size].pack("N") << data
end
end
|
385
386
387
|
# File 'lib/roby/log/server.rb', line 385
def range
synchronize { super }
end
|
395
396
397
398
399
400
401
402
|
# File 'lib/roby/log/server.rb', line 395
def read
if reinit?
reinit!
end
@current_time, sample = @pending_samples.pop
sample
end
|
354
355
356
357
358
359
360
|
# File 'lib/roby/log/server.rb', line 354
def reinit!
data_file.truncate(0)
@pending_samples.clear
@current_time = nil
super
end
|
#synchronize ⇒ Object
322
|
# File 'lib/roby/log/server.rb', line 322
def synchronize; @mutex.synchronize { yield } end
|