Class: Roby::Log::EventStream

Inherits:
DataStream show all
Defined in:
lib/roby/log/event_stream.rb

Overview

This class is a logger-compatible interface which read event and index logs, and may rebuild the task and event graphs from the marshalled events that are saved using for instance FileLogger

Instance Attribute Summary collapse

Attributes inherited from DataStream

#decoders, #id, #name, #type

Class Method Summary collapse

Instance Method Summary collapse

Methods inherited from DataStream

#==, #added_decoder, #advance, #clear, #clear_integrated, #decode, #decoder, #display, #displayed?, #eql?, #hash, #init, open, #read_and_decode, #to_s

Constructor Details

#initialize(basename, file = nil) ⇒ EventStream

Returns a new instance of EventStream.



22
23
24
25
26
27
28
# File 'lib/roby/log/event_stream.rb', line 22

def initialize(basename, file = nil)
    super(basename, "roby-events")
    if file
 @logfile = file
 reinit!
    end
end

Instance Attribute Details

#current_cycleObject (readonly)

The index of the currently displayed cycle in index_data



15
16
17
# File 'lib/roby/log/event_stream.rb', line 15

def current_cycle
  @current_cycle
end

#logfileObject (readonly)

The event log



12
13
14
# File 'lib/roby/log/event_stream.rb', line 12

def logfile
  @logfile
end

#start_cycleObject (readonly)

The index of the first non-empty cycle



17
18
19
# File 'lib/roby/log/event_stream.rb', line 17

def start_cycle
  @start_cycle
end

Class Method Details

.decode(data) ⇒ Object

Unmarshalls one cycle of data returned by #read and feeds it to the decoders



133
134
135
# File 'lib/roby/log/event_stream.rb', line 133

def self.decode(data)
    Marshal.load(data)
end

.init(data) ⇒ Object

Unmarshalls a set of data returned by #read_all and yield each sample that should be fed to the decoders



123
124
125
126
127
128
129
# File 'lib/roby/log/event_stream.rb', line 123

def self.init(data)
    io = StringIO.new(data)
    while !io.eof?
 yield(Marshal.load(io))
    end
rescue EOFError
end

Instance Method Details

#closeObject



34
# File 'lib/roby/log/event_stream.rb', line 34

def close; @logfile.close end

#current_timeObject

The current time



80
81
82
83
84
85
86
87
# File 'lib/roby/log/event_stream.rb', line 80

def current_time
    return if index_data.empty?
    time = Time.at(*index_data[current_cycle][:start])
    if index_data.size == current_cycle + 1
 time += index_data[current_cycle][:end]
    end
    time
end

#has_sample?Boolean

True if there is at least one sample available

Returns:

  • (Boolean)


58
59
60
61
# File 'lib/roby/log/event_stream.rb', line 58

def has_sample?
    logfile.update_index
    !index_data.empty? && (index_data.last[:pos] > logfile.tell)
end

#index_dataObject



36
# File 'lib/roby/log/event_stream.rb', line 36

def index_data; logfile.index_data end

#next_timeObject

The time we will reach when the next sample is processed



90
91
92
93
94
95
# File 'lib/roby/log/event_stream.rb', line 90

def next_time
    return if index_data.empty?
    if index_data.size > current_cycle + 1
 Time.at(*index_data[current_cycle + 1][:start])
    end
end

#openObject



29
30
31
32
33
# File 'lib/roby/log/event_stream.rb', line 29

def open
    @logfile = Roby::Log.open(name)
    reinit!
    self
end

#prepare_seek(time) ⇒ Object

Seek the data stream to the specified time.



64
65
66
67
68
69
70
71
72
# File 'lib/roby/log/event_stream.rb', line 64

def prepare_seek(time)
    if !time || !current_time || time < current_time
 clear

 @current_time  = nil
 @current_cycle = start_cycle
 logfile.rewind
    end
end

#rangeObject

A [min, max] array of the minimum and maximum times for this stream



20
# File 'lib/roby/log/event_stream.rb', line 20

def range; [start_time, logfile.range.last] end

#readObject

Reads a sample of data and returns it. It will be fed to decoders’ #decode method.

In this stream, this is the chunk of the marshalled file which corresponds to a cycle



102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
# File 'lib/roby/log/event_stream.rb', line 102

def read
    if reinit?
 reinit!
    end

    start_pos = index_data[current_cycle][:pos]
    end_pos   = if index_data.size > current_cycle + 1
        index_data[current_cycle + 1][:pos]
  else
        logfile.stat.size
  end

    logfile.seek(start_pos)
    logfile.read(end_pos - start_pos)

ensure
    @current_cycle += 1
end

#read_allObject

Read all data read so far in a format suitable to feed to #init_stream on the decoding side



139
140
141
142
143
144
145
146
147
# File 'lib/roby/log/event_stream.rb', line 139

def read_all
    end_pos   = if index_data.size > current_cycle + 1
        index_data[current_cycle + 1][:pos]
  else
        logfile.stat.size
  end
    logfile.rewind
    logfile.read(end_pos)
end

#reinit!Object

Reinitializes the stream



44
45
46
47
48
49
50
51
52
53
54
55
# File 'lib/roby/log/event_stream.rb', line 44

def reinit!
    prepare_seek(nil)

    super

    start_cycle = 0
    while start_cycle < index_data.size && index_data[start_cycle][:event_count] == 4
        start_cycle += 1
    end
    @start_cycle   = start_cycle
    @current_cycle = start_cycle
end

#reinit?Boolean

True if the stream has been reinitialized

Returns:

  • (Boolean)


39
40
41
# File 'lib/roby/log/event_stream.rb', line 39

def reinit?
    @reinit ||= (!index_data.empty? && logfile.stat.size < index_data.last[:pos])
end

#splat?Boolean

Returns:

  • (Boolean)


9
# File 'lib/roby/log/event_stream.rb', line 9

def splat?; true end

#start_timeObject



74
75
76
77
# File 'lib/roby/log/event_stream.rb', line 74

def start_time
    return if start_cycle == index_data.size
    Time.at(*index_data[start_cycle][:start])
end