Class: MeasReceiver::MeasTypeBuffer
- Inherits:
-
Object
- Object
- MeasReceiver::MeasTypeBuffer
- Defined in:
- lib/meas_receiver/meas_type_buffer.rb
Instance Attribute Summary collapse
-
#buffer ⇒ Object
Returns the value of attribute buffer.
-
#coefficients ⇒ Object
Returns the value of attribute coefficients.
-
#name ⇒ Object
readonly
Returns the value of attribute name.
-
#size ⇒ Object
readonly
Returns the value of attribute size.
-
#storage ⇒ Object
Returns the value of attribute storage.
-
#storage_buffer ⇒ Object
readonly
Returns the value of attribute storage_buffer.
-
#storage_last_i ⇒ Object
readonly
Returns the value of attribute storage_last_i.
-
#time_from ⇒ Object
Returns the value of attribute time_from.
-
#time_to ⇒ Object
Returns the value of attribute time_to.
Instance Method Summary collapse
- #[](i) ⇒ Object
-
#add!(v) ⇒ Object
add raw value.
-
#after_add ⇒ Object
Execute proc using last fetched measurement.
-
#at(_time) ⇒ Object
Search measurement in buffer.
-
#clean_up!(_interval = 10*60) ⇒ Object
Clean measurements from buffer older than X seconds from last archived one.
- #clean_up_stored!(_before = 0) ⇒ Object
-
#clean_up_to!(i) ⇒ Object
Remove everything before āiā.
- #first ⇒ Object
-
#index_by_time(_time) ⇒ Object
Get measurement index for given time.
-
#initialize(_meas_type) ⇒ MeasTypeBuffer
constructor
A new instance of MeasTypeBuffer.
- #interval ⇒ Object
- #last ⇒ Object
-
#mean_raw(i, count = @storage[:avg_side_count].to_i) ⇒ Object
Average/mean raw within buffer.
-
#mean_value(i, count = @storage[:avg_side_count]) ⇒ Object
Get mean value within buffer.
-
#perform_storage! ⇒ Object
Executed by scheduler to store important values.
- #raw_to_value(raw) ⇒ Object
-
#storage_calculate_averaged(_range) ⇒ Object
Prepare averaged values.
-
#storage_calculate_range ⇒ Object
Calculate range to storage algorithm.
-
#storage_get_ranges_to_store(_values, _range) ⇒ Object
Array of indexes measurements to store.
-
#storage_measurements_to_store(_indexes, _range) ⇒ Object
Fill time_from using previous measurement.
-
#storage_should_store?(_value, _ref_value, _value_time, _ref_value_time) ⇒ Boolean
Store if value if different more than X and is newer tan Y, force when it is newer than Z.
-
#storage_should_store_value?(value_a, value_b) ⇒ Boolean
Check value deviation.
Constructor Details
#initialize(_meas_type) ⇒ MeasTypeBuffer
Returns a new instance of MeasTypeBuffer.
8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 |
# File 'lib/meas_receiver/meas_type_buffer.rb', line 8 def initialize(_meas_type) @meas_type = _meas_type @logger = @meas_type.logger @debug = @meas_type.debug @name = @meas_type.name @coefficients = _meas_type.coefficients @after_proc = _meas_type.after_proc @storage = _meas_type.storage # index from which start storage algorithm @storage_last_i = 0 # last storage buffer @storage_buffer = Array.new @buffer = Array.new @size = 0 @mutex = Mutex.new end |
Instance Attribute Details
#buffer ⇒ Object
Returns the value of attribute buffer.
28 29 30 |
# File 'lib/meas_receiver/meas_type_buffer.rb', line 28 def buffer @buffer end |
#coefficients ⇒ Object
Returns the value of attribute coefficients.
28 29 30 |
# File 'lib/meas_receiver/meas_type_buffer.rb', line 28 def coefficients @coefficients end |
#name ⇒ Object (readonly)
Returns the value of attribute name.
29 30 31 |
# File 'lib/meas_receiver/meas_type_buffer.rb', line 29 def name @name end |
#size ⇒ Object (readonly)
Returns the value of attribute size.
29 30 31 |
# File 'lib/meas_receiver/meas_type_buffer.rb', line 29 def size @size end |
#storage ⇒ Object
Returns the value of attribute storage.
28 29 30 |
# File 'lib/meas_receiver/meas_type_buffer.rb', line 28 def storage @storage end |
#storage_buffer ⇒ Object (readonly)
Returns the value of attribute storage_buffer.
29 30 31 |
# File 'lib/meas_receiver/meas_type_buffer.rb', line 29 def storage_buffer @storage_buffer end |
#storage_last_i ⇒ Object (readonly)
Returns the value of attribute storage_last_i.
29 30 31 |
# File 'lib/meas_receiver/meas_type_buffer.rb', line 29 def storage_last_i @storage_last_i end |
#time_from ⇒ Object
Returns the value of attribute time_from.
28 29 30 |
# File 'lib/meas_receiver/meas_type_buffer.rb', line 28 def time_from @time_from end |
#time_to ⇒ Object
Returns the value of attribute time_to.
28 29 30 |
# File 'lib/meas_receiver/meas_type_buffer.rb', line 28 def time_to @time_to end |
Instance Method Details
#[](i) ⇒ Object
56 57 58 59 |
# File 'lib/meas_receiver/meas_type_buffer.rb', line 56 def [](i) return nil if i < 0 and i >= @size { time: @time_from + interval * i, raw: @buffer[i], value: raw_to_value(@buffer[i]) } end |
#add!(v) ⇒ Object
add raw value
32 33 34 35 36 37 38 39 40 41 42 43 |
# File 'lib/meas_receiver/meas_type_buffer.rb', line 32 def add!(v) @mutex.synchronize do @size += 1 @buffer << v @time_from ||= Time.now @time_to = Time.now @logger.debug("Added #{v.to_s.yellow} to buffer, size #{@size.to_s.blue}") if @debug end after_add end |
#after_add ⇒ Object
Execute proc using last fetched measurement
46 47 48 49 50 |
# File 'lib/meas_receiver/meas_type_buffer.rb', line 46 def after_add unless @after_proc.nil? @after_proc.call(self.last) end end |
#at(_time) ⇒ Object
Search measurement in buffer
84 85 86 87 88 |
# File 'lib/meas_receiver/meas_type_buffer.rb', line 84 def at(_time) _i = index_by_time(_time) return nil if _i.nil? return self[_i] end |
#clean_up!(_interval = 10*60) ⇒ Object
Clean measurements from buffer older than X seconds from last archived one
218 219 220 221 |
# File 'lib/meas_receiver/meas_type_buffer.rb', line 218 def clean_up!(_interval = 10*60) _before_count = (_interval / self.interval).round clean_up_stored!(_before_count) end |
#clean_up_stored!(_before = 0) ⇒ Object
223 224 225 226 227 |
# File 'lib/meas_receiver/meas_type_buffer.rb', line 223 def clean_up_stored!(_before = 0) _i = @storage_last_i - _before _i = 0 if _i < 0 clean_up_to!(_i) end |
#clean_up_to!(i) ⇒ Object
Remove everything before āiā
230 231 232 233 234 235 236 237 238 |
# File 'lib/meas_receiver/meas_type_buffer.rb', line 230 def clean_up_to!(i) @mutex.synchronize do _interval = self.interval @buffer = @buffer[i..-1] @size -= i @storage_last_i -= i @time_from += _interval * i end end |
#first ⇒ Object
69 70 71 |
# File 'lib/meas_receiver/meas_type_buffer.rb', line 69 def first self[0] end |
#index_by_time(_time) ⇒ Object
Get measurement index for given time
78 79 80 81 |
# File 'lib/meas_receiver/meas_type_buffer.rb', line 78 def index_by_time(_time) return nil if _time < @time_from or _time > @time_to return ((_time.to_f - @time_from.to_f)/interval.to_f).round end |
#interval ⇒ Object
52 53 54 |
# File 'lib/meas_receiver/meas_type_buffer.rb', line 52 def interval (@time_to - @time_from).to_f / @size.to_f end |
#last ⇒ Object
73 74 75 |
# File 'lib/meas_receiver/meas_type_buffer.rb', line 73 def last self[@size - 1] end |
#mean_raw(i, count = @storage[:avg_side_count].to_i) ⇒ Object
Average/mean raw within buffer
91 92 93 94 95 96 97 98 99 100 101 102 103 |
# File 'lib/meas_receiver/meas_type_buffer.rb', line 91 def mean_raw(i, count = @storage[:avg_side_count].to_i) return @buffer[i] if count == 0 _from = i - count _to = i + count # not in range _from = 0 if _from < 0 _to = @buffer.size - 1 if _to >= @buffer.size _a = @buffer[_from..._to] return _a.mean end |
#mean_value(i, count = @storage[:avg_side_count]) ⇒ Object
Get mean value within buffer
106 107 108 |
# File 'lib/meas_receiver/meas_type_buffer.rb', line 106 def mean_value(i, count = @storage[:avg_side_count]) raw_to_value(mean_raw(i, count)) end |
#perform_storage! ⇒ Object
Executed by scheduler to store important values
113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 |
# File 'lib/meas_receiver/meas_type_buffer.rb', line 113 def perform_storage! @mutex.synchronize do @logger.debug("Performing storage for #{self.name.red}") if @debug t = Time.now _range = storage_calculate_range _avg = storage_calculate_averaged(_range) _indexes = storage_get_ranges_to_store(_avg, _range) _m = storage_measurements_to_store(_indexes, _range) @storage_buffer = _m # mark from where continue next time, it is r if _indexes.size > 0 @storage_last_i = _indexes.last[1] + @storage_last_i.to_i end @logger.debug("Storage buffer created in #{(Time.now - t).to_s.green}") if @debug # after this mutex is not needed end # call proc @logger.debug("Storage buffer size is #{@storage_buffer.size.to_s.cyan}") if @debug if @storage[:proc] @storage[:proc].call(@storage_buffer) end @logger.debug("Storage completed for #{self.name.red}") if @debug return @storage_buffer end |
#raw_to_value(raw) ⇒ Object
61 62 63 64 65 66 67 |
# File 'lib/meas_receiver/meas_type_buffer.rb', line 61 def raw_to_value(raw) _r = (raw.to_f + @coefficients[:offset].to_f) * @coefficients[:linear].to_f if @coefficients[:proc] _r = @coefficients[:proc].call(_r) end return _r end |
#storage_calculate_averaged(_range) ⇒ Object
Prepare averaged values
156 157 158 |
# File 'lib/meas_receiver/meas_type_buffer.rb', line 156 def storage_calculate_averaged(_range) _range.collect { |i| mean_value(i) } end |
#storage_calculate_range ⇒ Object
Calculate range to storage algorithm
144 145 146 147 148 149 150 151 152 153 |
# File 'lib/meas_receiver/meas_type_buffer.rb', line 144 def storage_calculate_range # from where continue _from = @storage_last_i # only check measurements up to _to = @buffer.size - 1 @logger.debug("Range to store #{_from.to_s.magenta}..#{_to.to_s.magenta}") if @debug return _from.._to end |
#storage_get_ranges_to_store(_values, _range) ⇒ Object
Array of indexes measurements to store
178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 |
# File 'lib/meas_receiver/meas_type_buffer.rb', line 178 def storage_get_ranges_to_store(_values, _range) _array = Array.new _from = 0 _ref_value = _values.first (0...(_values.size)).each do |_time| _value = _values[_time] if storage_should_store?(_value, _ref_value, _time, _from) _array << [_from, _time] _from = _time _ref_value = _value end end return _array end |
#storage_measurements_to_store(_indexes, _range) ⇒ Object
Fill time_from using previous measurement
199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 |
# File 'lib/meas_receiver/meas_type_buffer.rb', line 199 def storage_measurements_to_store(_indexes, _range) # need to add @storage_last_i r = _indexes.collect { |is| _from = self[is[0] + @storage_last_i] _to = self[is[1] + @storage_last_i] _m = _from.clone _m[:time_from] = _m[:time] _m[:time_to] = _to[:time] _m.delete(:time) _m } return r end |
#storage_should_store?(_value, _ref_value, _value_time, _ref_value_time) ⇒ Boolean
Store if value if different more than X and is newer tan Y, force when it is newer than Z
167 168 169 170 171 172 173 174 175 |
# File 'lib/meas_receiver/meas_type_buffer.rb', line 167 def storage_should_store?(_value, _ref_value, _value_time, _ref_value_time) _ref_value_nil = _ref_value.nil? _value_diff = storage_should_store_value?(_ref_value, _value) _time_min = (_value_time - _ref_value_time).abs > @storage[:min_unit_interval] _time_max = (_value_time - _ref_value_time) > @storage[:max_unit_interval] _r = ((_value_diff and _time_min) or _time_max) # puts "#{_r} - v: #{_value}, #{_ref_value}, #{_value_time}, #{_ref_value_time} -> #{_ref_value_nil} or (#{_value_diff} and #{_time_min}) or #{_time_max}" return _r end |
#storage_should_store_value?(value_a, value_b) ⇒ Boolean
Check value deviation
161 162 163 164 |
# File 'lib/meas_receiver/meas_type_buffer.rb', line 161 def storage_should_store_value?(value_a, value_b) return true if value_a.nil? or value_b.nil? (value_a - value_b).abs > @storage[:value_deviation].to_f end |