Class: MeasReceiver::MeasTypeBuffer

Inherits:
Object
  • Object
show all
Defined in:
lib/meas_receiver/meas_type_buffer.rb

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(_meas_type) ⇒ MeasTypeBuffer

Returns a new instance of MeasTypeBuffer.



8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
# File 'lib/meas_receiver/meas_type_buffer.rb', line 8

def initialize(_meas_type)
  @meas_type = _meas_type
  @logger = @meas_type.logger
  @debug = @meas_type.debug
  @name = @meas_type.name

  @coefficients = _meas_type.coefficients
  @after_proc = _meas_type.after_proc
  @storage = _meas_type.storage
  # index from which start storage algorithm
  @storage_last_i = 0
  # last storage buffer
  @storage_buffer = Array.new

  @buffer = Array.new
  @size = 0

  @mutex = Mutex.new
end

Instance Attribute Details

#bufferObject

Returns the value of attribute buffer.



28
29
30
# File 'lib/meas_receiver/meas_type_buffer.rb', line 28

def buffer
  @buffer
end

#coefficientsObject

Returns the value of attribute coefficients.



28
29
30
# File 'lib/meas_receiver/meas_type_buffer.rb', line 28

def coefficients
  @coefficients
end

#nameObject (readonly)

Returns the value of attribute name.



29
30
31
# File 'lib/meas_receiver/meas_type_buffer.rb', line 29

def name
  @name
end

#sizeObject (readonly)

Returns the value of attribute size.



29
30
31
# File 'lib/meas_receiver/meas_type_buffer.rb', line 29

def size
  @size
end

#storageObject

Returns the value of attribute storage.



28
29
30
# File 'lib/meas_receiver/meas_type_buffer.rb', line 28

def storage
  @storage
end

#storage_bufferObject (readonly)

Returns the value of attribute storage_buffer.



29
30
31
# File 'lib/meas_receiver/meas_type_buffer.rb', line 29

def storage_buffer
  @storage_buffer
end

#storage_last_iObject (readonly)

Returns the value of attribute storage_last_i.



29
30
31
# File 'lib/meas_receiver/meas_type_buffer.rb', line 29

def storage_last_i
  @storage_last_i
end

#time_fromObject

Returns the value of attribute time_from.



28
29
30
# File 'lib/meas_receiver/meas_type_buffer.rb', line 28

def time_from
  @time_from
end

#time_toObject

Returns the value of attribute time_to.



28
29
30
# File 'lib/meas_receiver/meas_type_buffer.rb', line 28

def time_to
  @time_to
end

Instance Method Details

#[](i) ⇒ Object



56
57
58
59
# File 'lib/meas_receiver/meas_type_buffer.rb', line 56

def [](i)
  return nil if i < 0 and i >= @size
  { time: @time_from + interval * i, raw: @buffer[i], value: raw_to_value(@buffer[i]) }
end

#add!(v) ⇒ Object

add raw value



32
33
34
35
36
37
38
39
40
41
42
43
# File 'lib/meas_receiver/meas_type_buffer.rb', line 32

def add!(v)
  @mutex.synchronize do
    @size += 1
    @buffer << v
    @time_from ||= Time.now
    @time_to = Time.now

    @logger.debug("Added #{v.to_s.yellow} to buffer, size #{@size.to_s.blue}") if @debug
  end

  after_add
end

#after_addObject

Execute proc using last fetched measurement



46
47
48
49
50
# File 'lib/meas_receiver/meas_type_buffer.rb', line 46

def after_add
  unless @after_proc.nil?
    @after_proc.call(self.last)
  end
end

#at(_time) ⇒ Object

Search measurement in buffer



84
85
86
87
88
# File 'lib/meas_receiver/meas_type_buffer.rb', line 84

def at(_time)
  _i = index_by_time(_time)
  return nil if _i.nil?
  return self[_i]
end

#clean_up!(_interval = 10*60) ⇒ Object

Clean measurements from buffer older than X seconds from last archived one



218
219
220
221
# File 'lib/meas_receiver/meas_type_buffer.rb', line 218

def clean_up!(_interval = 10*60)
  _before_count = (_interval / self.interval).round
  clean_up_stored!(_before_count)
end

#clean_up_stored!(_before = 0) ⇒ Object



223
224
225
226
227
# File 'lib/meas_receiver/meas_type_buffer.rb', line 223

def clean_up_stored!(_before = 0)
  _i = @storage_last_i - _before
  _i = 0 if _i < 0
  clean_up_to!(_i)
end

#clean_up_to!(i) ⇒ Object

Remove everything before ā€œiā€



230
231
232
233
234
235
236
237
238
# File 'lib/meas_receiver/meas_type_buffer.rb', line 230

def clean_up_to!(i)
  @mutex.synchronize do
    _interval = self.interval
    @buffer = @buffer[i..-1]
    @size -= i
    @storage_last_i -= i
    @time_from += _interval * i
  end
end

#firstObject



69
70
71
# File 'lib/meas_receiver/meas_type_buffer.rb', line 69

def first
  self[0]
end

#index_by_time(_time) ⇒ Object

Get measurement index for given time



78
79
80
81
# File 'lib/meas_receiver/meas_type_buffer.rb', line 78

def index_by_time(_time)
  return nil if _time < @time_from or _time > @time_to
  return ((_time.to_f - @time_from.to_f)/interval.to_f).round
end

#intervalObject



52
53
54
# File 'lib/meas_receiver/meas_type_buffer.rb', line 52

def interval
  (@time_to - @time_from).to_f / @size.to_f
end

#lastObject



73
74
75
# File 'lib/meas_receiver/meas_type_buffer.rb', line 73

def last
  self[@size - 1]
end

#mean_raw(i, count = @storage[:avg_side_count].to_i) ⇒ Object

Average/mean raw within buffer



91
92
93
94
95
96
97
98
99
100
101
102
103
# File 'lib/meas_receiver/meas_type_buffer.rb', line 91

def mean_raw(i, count = @storage[:avg_side_count].to_i)
  return @buffer[i] if count == 0

  _from = i - count
  _to = i + count

  # not in range
  _from = 0 if _from < 0
  _to = @buffer.size - 1 if _to >= @buffer.size
  _a = @buffer[_from..._to]

  return _a.mean
end

#mean_value(i, count = @storage[:avg_side_count]) ⇒ Object

Get mean value within buffer



106
107
108
# File 'lib/meas_receiver/meas_type_buffer.rb', line 106

def mean_value(i, count = @storage[:avg_side_count])
  raw_to_value(mean_raw(i, count))
end

#perform_storage!Object

Executed by scheduler to store important values



113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
# File 'lib/meas_receiver/meas_type_buffer.rb', line 113

def perform_storage!
  @mutex.synchronize do
    @logger.debug("Performing storage for #{self.name.red}") if @debug
    t = Time.now

    _range = storage_calculate_range
    _avg = storage_calculate_averaged(_range)
    _indexes = storage_get_ranges_to_store(_avg, _range)
    _m = storage_measurements_to_store(_indexes, _range)
    @storage_buffer = _m

    # mark from where continue next time, it is r
    if _indexes.size > 0
      @storage_last_i = _indexes.last[1] + @storage_last_i.to_i
    end

    @logger.debug("Storage buffer created in #{(Time.now - t).to_s.green}") if @debug
    # after this mutex is not needed
  end

  # call proc
  @logger.debug("Storage buffer size is #{@storage_buffer.size.to_s.cyan}") if @debug
  if @storage[:proc]
    @storage[:proc].call(@storage_buffer)
  end
  @logger.debug("Storage completed for #{self.name.red}") if @debug

  return @storage_buffer
end

#raw_to_value(raw) ⇒ Object



61
62
63
64
65
66
67
# File 'lib/meas_receiver/meas_type_buffer.rb', line 61

def raw_to_value(raw)
  _r = (raw.to_f + @coefficients[:offset].to_f) * @coefficients[:linear].to_f
  if @coefficients[:proc]
    _r = @coefficients[:proc].call(_r)
  end
  return _r
end

#storage_calculate_averaged(_range) ⇒ Object

Prepare averaged values



156
157
158
# File 'lib/meas_receiver/meas_type_buffer.rb', line 156

def storage_calculate_averaged(_range)
  _range.collect { |i| mean_value(i) }
end

#storage_calculate_rangeObject

Calculate range to storage algorithm



144
145
146
147
148
149
150
151
152
153
# File 'lib/meas_receiver/meas_type_buffer.rb', line 144

def storage_calculate_range
  # from where continue
  _from = @storage_last_i
  # only check measurements up to
  _to = @buffer.size - 1

  @logger.debug("Range to store #{_from.to_s.magenta}..#{_to.to_s.magenta}") if @debug

  return _from.._to
end

#storage_get_ranges_to_store(_values, _range) ⇒ Object

Array of indexes measurements to store



178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
# File 'lib/meas_receiver/meas_type_buffer.rb', line 178

def storage_get_ranges_to_store(_values, _range)
  _array = Array.new

  _from = 0
  _ref_value = _values.first

  (0...(_values.size)).each do |_time|
    _value = _values[_time]

    if storage_should_store?(_value, _ref_value, _time, _from)
      _array << [_from, _time]

      _from = _time
      _ref_value = _value
    end
  end

  return _array
end

#storage_measurements_to_store(_indexes, _range) ⇒ Object

Fill time_from using previous measurement



199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
# File 'lib/meas_receiver/meas_type_buffer.rb', line 199

def storage_measurements_to_store(_indexes, _range)
  # need to add @storage_last_i
  r = _indexes.collect { |is|
    _from = self[is[0] + @storage_last_i]
    _to = self[is[1] + @storage_last_i]

    _m = _from.clone
    _m[:time_from] = _m[:time]
    _m[:time_to] = _to[:time]
    _m.delete(:time)

    _m
  }
  return r
end

#storage_should_store?(_value, _ref_value, _value_time, _ref_value_time) ⇒ Boolean

Store if value if different more than X and is newer tan Y, force when it is newer than Z

Returns:

  • (Boolean)


167
168
169
170
171
172
173
174
175
# File 'lib/meas_receiver/meas_type_buffer.rb', line 167

def storage_should_store?(_value, _ref_value, _value_time, _ref_value_time)
  _ref_value_nil = _ref_value.nil?
  _value_diff = storage_should_store_value?(_ref_value, _value)
  _time_min = (_value_time - _ref_value_time).abs > @storage[:min_unit_interval]
  _time_max = (_value_time - _ref_value_time) > @storage[:max_unit_interval]
  _r = ((_value_diff and _time_min) or _time_max)
  # puts "#{_r} - v: #{_value}, #{_ref_value}, #{_value_time}, #{_ref_value_time} -> #{_ref_value_nil} or (#{_value_diff} and #{_time_min}) or #{_time_max}"
  return _r
end

#storage_should_store_value?(value_a, value_b) ⇒ Boolean

Check value deviation

Returns:

  • (Boolean)


161
162
163
164
# File 'lib/meas_receiver/meas_type_buffer.rb', line 161

def storage_should_store_value?(value_a, value_b)
  return true if value_a.nil? or value_b.nil?
  (value_a - value_b).abs > @storage[:value_deviation].to_f
end