Class: PulseMeter::Sensor::Timeline Abstract
- Includes:
- Mixins::Utils, TimelineReduce
- Defined in:
- lib/pulse-meter/sensor/timeline.rb
Overview
Represents timelined sensor: series of values, one value for each consequent time interval.
Direct Known Subclasses
PulseMeter::Sensor::Timelined::Average, PulseMeter::Sensor::Timelined::Counter, PulseMeter::Sensor::Timelined::HashedCounter, PulseMeter::Sensor::Timelined::HashedIndicator, PulseMeter::Sensor::Timelined::Indicator, PulseMeter::Sensor::Timelined::UniqCounter, PulseMeter::Sensor::Timelined::ZSetBased
Constant Summary collapse
- MAX_TIMESPAN_POINTS =
1000
- DEFAULTS =
Default values for some sensor parameters
{ :raw_data_ttl => 3600, :reduce_delay => 60, }
Constants included from TimelineReduce
PulseMeter::Sensor::TimelineReduce::MAX_INTERVALS
Constants included from Mixins::Dumper
Mixins::Dumper::DUMP_REDIS_KEY
Instance Attribute Summary collapse
-
#interval ⇒ Fixnum
readonly
Rotation interval.
-
#raw_data_ttl ⇒ Fixnum
readonly
How long unsummarized raw data will be stored before expiration.
-
#reduce_delay ⇒ Object
readonly
Returns the value of attribute reduce_delay.
-
#ttl ⇒ Fixnum
readonly
How long summarized data will be stored before expiration.
Attributes inherited from Base
Instance Method Summary collapse
- #aggregate_event(key, value) ⇒ Object abstract
-
#cleanup ⇒ Object
Clean up all sensor metadata and data.
-
#current_interval_id ⇒ Fixnum
Returns current interval id.
-
#current_raw_data_key ⇒ Object
Returns Redis key by which raw data for current interval is stored.
-
#data_key(id) ⇒ Object
Returns Redis key by which summarized data for given interval is stored.
- #deflate_safe(value) ⇒ Object abstract
-
#drop_within(from, till) ⇒ Object
Drops sensor data within given time.
-
#event_at(time, value = nil) ⇒ Object
Processes event from the past.
-
#get_interval_id(time) ⇒ Object
Returns interval id where given time is.
-
#get_raw_value(interval_id) ⇒ SensorData
Returns sensor data for given interval making in-memory summarization and returns calculated value.
-
#initialize(name, options) ⇒ Timeline
constructor
Initializes sensor with given name and parameters.
-
#raw_data_key(id) ⇒ Object
Returns Redis key by which raw data for given interval is stored.
- #summarize(key) ⇒ Object abstract
-
#timeline(time_ago) ⇒ Array<SensorData>
Returts sensor data within some last seconds.
-
#timeline_within(from, till, skip_optimization = false) ⇒ Array<SensorData>
Returts sensor data within given time.
Methods included from TimelineReduce
#collect_ids_to_reduce, included, #reduce, #reduce_all_raw
Methods included from Mixins::Utils
#assert_array!, #assert_positive_integer!, #assert_ranged_float!, #camelize, #camelize_keys, #constantize, #each_subset, #parse_time, #subsets_of, #symbolize_keys, #titleize, #uniqid
Methods inherited from Base
#annotate, #annotation, #command_aggregator, #event
Methods included from Mixins::Dumper
Constructor Details
#initialize(name, options) ⇒ Timeline
Initializes sensor with given name and parameters
35 36 37 38 39 40 41 |
# File 'lib/pulse-meter/sensor/timeline.rb', line 35 def initialize(name, ) @interval = assert_positive_integer!(, :interval) @ttl = assert_positive_integer!(, :ttl) @raw_data_ttl = assert_positive_integer!(, :raw_data_ttl, DEFAULTS[:raw_data_ttl]) @reduce_delay = assert_positive_integer!(, :reduce_delay, DEFAULTS[:reduce_delay]) super end |
Instance Attribute Details
#interval ⇒ Fixnum (readonly)
Returns Rotation interval.
21 22 23 |
# File 'lib/pulse-meter/sensor/timeline.rb', line 21 def interval @interval end |
#raw_data_ttl ⇒ Fixnum (readonly)
Returns How long unsummarized raw data will be stored before expiration.
21 |
# File 'lib/pulse-meter/sensor/timeline.rb', line 21 attr_reader :interval, :ttl, :raw_data_ttl, :reduce_delay |
#reduce_delay ⇒ Object (readonly)
Returns the value of attribute reduce_delay.
21 |
# File 'lib/pulse-meter/sensor/timeline.rb', line 21 attr_reader :interval, :ttl, :raw_data_ttl, :reduce_delay |
#ttl ⇒ Fixnum (readonly)
Returns How long summarized data will be stored before expiration.
21 |
# File 'lib/pulse-meter/sensor/timeline.rb', line 21 attr_reader :interval, :ttl, :raw_data_ttl, :reduce_delay |
Instance Method Details
#aggregate_event(key, value) ⇒ Object
Registeres event for current interval identified by key
154 155 156 157 |
# File 'lib/pulse-meter/sensor/timeline.rb', line 154 def aggregate_event(key, value) # simple redis.set(key, value) end |
#cleanup ⇒ Object
Clean up all sensor metadata and data
44 45 46 47 48 49 50 |
# File 'lib/pulse-meter/sensor/timeline.rb', line 44 def cleanup keys = redis.keys(raw_data_key('*')) + redis.keys(data_key('*')) multi do keys.each{|key| redis.del(key)} end super end |
#current_interval_id ⇒ Fixnum
Returns current interval id
147 148 149 |
# File 'lib/pulse-meter/sensor/timeline.rb', line 147 def current_interval_id get_interval_id(Time.now) end |
#current_raw_data_key ⇒ Object
Returns Redis key by which raw data for current interval is stored
123 124 125 |
# File 'lib/pulse-meter/sensor/timeline.rb', line 123 def current_raw_data_key raw_data_key(current_interval_id) end |
#data_key(id) ⇒ Object
Returns Redis key by which summarized data for given interval is stored
135 136 137 |
# File 'lib/pulse-meter/sensor/timeline.rb', line 135 def data_key(id) "pulse_meter:data:#{name}:#{id}" end |
#deflate_safe(value) ⇒ Object
Deflates data taken from redis as string preserving nil values
168 169 170 171 172 |
# File 'lib/pulse-meter/sensor/timeline.rb', line 168 def deflate_safe(value) value.nil? ? nil : deflate(value) rescue nil end |
#drop_within(from, till) ⇒ Object
Drops sensor data within given time
109 110 111 112 113 114 115 116 117 118 119 120 |
# File 'lib/pulse-meter/sensor/timeline.rb', line 109 def drop_within(from, till) raise ArgumentError unless from.kind_of?(Time) && till.kind_of?(Time) start_time, end_time = from.to_i, till.to_i current_interval_id = get_interval_id(start_time) + interval keys = [] while current_interval_id < end_time keys << data_key(current_interval_id) keys << raw_data_key(current_interval_id) current_interval_id += interval end keys.empty? ? 0 : redis.del(*keys) end |
#event_at(time, value = nil) ⇒ Object
Processes event from the past
55 56 57 58 59 60 61 62 63 64 65 |
# File 'lib/pulse-meter/sensor/timeline.rb', line 55 def event_at(time, value = nil) multi do interval_id = get_interval_id(time) key = raw_data_key(interval_id) aggregate_event(key, value) command_aggregator.expire(key, raw_data_ttl) end true rescue StandardError => e false end |
#get_interval_id(time) ⇒ Object
Returns interval id where given time is
141 142 143 |
# File 'lib/pulse-meter/sensor/timeline.rb', line 141 def get_interval_id(time) (time.to_i / interval) * interval end |
#get_raw_value(interval_id) ⇒ SensorData
Returns sensor data for given interval making in-memory summarization
and returns calculated value
96 97 98 99 100 101 102 103 |
# File 'lib/pulse-meter/sensor/timeline.rb', line 96 def get_raw_value(interval_id) interval_raw_data_key = raw_data_key(interval_id) if redis.exists(interval_raw_data_key) sensor_data(interval_id, summarize(interval_raw_data_key)) else sensor_data(interval_id, nil) end end |
#raw_data_key(id) ⇒ Object
Returns Redis key by which raw data for given interval is stored
129 130 131 |
# File 'lib/pulse-meter/sensor/timeline.rb', line 129 def raw_data_key(id) "pulse_meter:raw:#{name}:#{id}" end |
#summarize(key) ⇒ Object
Summarizes all event within interval to a single value
161 162 163 164 |
# File 'lib/pulse-meter/sensor/timeline.rb', line 161 def summarize(key) # simple redis.get(key) end |
#timeline(time_ago) ⇒ Array<SensorData>
Returts sensor data within some last seconds
71 72 73 74 75 |
# File 'lib/pulse-meter/sensor/timeline.rb', line 71 def timeline(time_ago) raise ArgumentError unless time_ago.respond_to?(:to_i) && time_ago.to_i > 0 now = Time.now timeline_within(now - time_ago.to_i, now) end |
#timeline_within(from, till, skip_optimization = false) ⇒ Array<SensorData>
Returts sensor data within given time
83 84 85 86 87 88 89 90 |
# File 'lib/pulse-meter/sensor/timeline.rb', line 83 def timeline_within(from, till, skip_optimization = false) raise ArgumentError unless from.kind_of?(Time) && till.kind_of?(Time) start_time, end_time = from.to_i, till.to_i actual_interval = optimized_interval(start_time, end_time, skip_optimization) start_interval_id = get_interval_id(start_time) + actual_interval ids, values = fetch_reduced_interval_data(start_interval_id, actual_interval, end_time) zip_with_raw_data(ids, values) end |