Class: PulseMeter::Sensor::Timeline Abstract

Inherits:
Base
  • Object
show all
Includes:
Mixins::Utils
Defined in:
lib/pulse-meter/sensor/timeline.rb

Overview

This class is abstract.

Represents timelined sensor: series of values, one value for each consequent time interval.

Constant Summary collapse

DEFAULTS =

Default values for some sensor parameters

{
  :raw_data_ttl => 3600,
  :reduce_delay => 60,
}

Constants included from Mixins::Dumper

Mixins::Dumper::DUMP_REDIS_KEY

Instance Attribute Summary collapse

Attributes inherited from Base

#name, #redis

Class Method Summary collapse

Instance Method Summary collapse

Methods included from Mixins::Utils

#assert_positive_integer!, #assert_ranged_float!, #camelize, #camelize_keys, #constantize, #symbolize_keys, #titleize, #uniqid

Methods inherited from Base

#annotate, #annotation

Methods included from Mixins::Dumper

included

Constructor Details

#initialize(name, options) ⇒ Timeline

Initializes sensor with given name and parameters

Parameters:

  • name (String)

    sensor name

  • options (Hash)

    a customizable set of options

Options Hash (options):

  • :interval (Fixnum)

    Rotation interval

  • :ttl (Fixnum)

    How long summarized data will be stored before expiration

  • :raw_data_ttl (Fixnum)

    How long unsummarized raw data will be stored before expiration

  • :reduce_delay (Fixnum)

    Delay between end of interval and summarization



32
33
34
35
36
37
38
# File 'lib/pulse-meter/sensor/timeline.rb', line 32

def initialize(name, options)
  @interval = assert_positive_integer!(options, :interval)
  @ttl = assert_positive_integer!(options, :ttl)
  @raw_data_ttl = assert_positive_integer!(options, :raw_data_ttl, DEFAULTS[:raw_data_ttl])
  @reduce_delay = assert_positive_integer!(options, :reduce_delay, DEFAULTS[:reduce_delay])
  super
end

Instance Attribute Details

#intervalFixnum (readonly)

Returns Rotation interval.

Returns:

  • (Fixnum)

    Rotation interval



18
19
20
# File 'lib/pulse-meter/sensor/timeline.rb', line 18

def interval
  @interval
end

#raw_data_ttlFixnum (readonly)

Returns How long unsummarized raw data will be stored before expiration.

Returns:

  • (Fixnum)

    How long unsummarized raw data will be stored before expiration



18
# File 'lib/pulse-meter/sensor/timeline.rb', line 18

attr_reader :interval, :ttl, :raw_data_ttl, :reduce_delay

#reduce_delayObject (readonly)

Returns the value of attribute reduce_delay.



18
# File 'lib/pulse-meter/sensor/timeline.rb', line 18

attr_reader :interval, :ttl, :raw_data_ttl, :reduce_delay

#ttlFixnum (readonly)

Returns How long summarized data will be stored before expiration.

Returns:

  • (Fixnum)

    How long summarized data will be stored before expiration



18
# File 'lib/pulse-meter/sensor/timeline.rb', line 18

attr_reader :interval, :ttl, :raw_data_ttl, :reduce_delay

Class Method Details

.reduce_all_rawObject



88
89
90
91
92
# File 'lib/pulse-meter/sensor/timeline.rb', line 88

def self.reduce_all_raw
  list_objects.each do |sensor|
    sensor.reduce_all_raw if sensor.respond_to? :reduce_all_raw
  end
end

Instance Method Details

#aggregate_event(key, value) ⇒ Object

This method is abstract.

Registeres event for current interval identified by key

Parameters:

  • key (Fixnum)

    interval id

  • value (Object)

    value to be aggregated



166
167
168
169
# File 'lib/pulse-meter/sensor/timeline.rb', line 166

def aggregate_event(key, value)
  # simple
  redis.set(key, value)
end

#cleanupObject

Clean up all sensor metadata and data



41
42
43
44
45
46
47
# File 'lib/pulse-meter/sensor/timeline.rb', line 41

def cleanup
  keys = redis.keys(raw_data_key('*')) + redis.keys(data_key('*'))
  multi do
    keys.each{|key| redis.del(key)}
  end
  super
end

#current_interval_idFixnum

Returns current interval id

Returns:

  • (Fixnum)


159
160
161
# File 'lib/pulse-meter/sensor/timeline.rb', line 159

def current_interval_id
  get_interval_id(Time.now)
end

#current_raw_data_keyObject

Returns Redis key by which raw data for current interval is stored



135
136
137
# File 'lib/pulse-meter/sensor/timeline.rb', line 135

def current_raw_data_key
  raw_data_key(current_interval_id)
end

#data_key(id) ⇒ Object

Returns Redis key by which summarized data for given interval is stored

Parameters:

  • id (Fixnum)

    interval id



147
148
149
# File 'lib/pulse-meter/sensor/timeline.rb', line 147

def data_key(id)
  "pulse_meter:data:#{name}:#{id}"
end

#event(value = nil) ⇒ Object

Processes event



50
51
52
53
54
55
56
# File 'lib/pulse-meter/sensor/timeline.rb', line 50

def event(value = nil)
  multi do
    current_key = current_raw_data_key
    aggregate_event(current_key, value)
    redis.expire(current_key, raw_data_ttl)
  end
end

#get_interval_id(time) ⇒ Object

Returns interval id where given time is

Parameters:

  • time (Time)


153
154
155
# File 'lib/pulse-meter/sensor/timeline.rb', line 153

def get_interval_id(time)
  (time.to_i / interval) * interval
end

#get_timeline_value(interval_id) ⇒ SensorData

Returns sensor data for given interval.

If the interval is not over yet makes its data in-memory summarization
and returns calculated value

Parameters:

  • interval_id (Fixnum)

Returns:



126
127
128
129
130
131
132
# File 'lib/pulse-meter/sensor/timeline.rb', line 126

def get_timeline_value(interval_id)
  interval_data_key = data_key(interval_id)
  return SensorData.new(Time.at(interval_id), redis.get(interval_data_key)) if redis.exists(interval_data_key)
  interval_raw_data_key = raw_data_key(interval_id)
  return SensorData.new(Time.at(interval_id), summarize(interval_raw_data_key)) if redis.exists(interval_raw_data_key)
  SensorData.new(Time.at(interval_id), nil)
end

#raw_data_key(id) ⇒ Object

Returns Redis key by which raw data for given interval is stored

Parameters:

  • id (Fixnum)

    interval id



141
142
143
# File 'lib/pulse-meter/sensor/timeline.rb', line 141

def raw_data_key(id)
  "pulse_meter:raw:#{name}:#{id}"
end

#reduce(interval_id) ⇒ Object

Note:

Interval id is just unixtime of its lower bound. Ruduction is a process of ‘compressing’ all interval’s raw data to a single value. When reduction is done summarized data is saved to Redis separately with expiration time taken from sensor configuration.

Reduces data in given interval.

Parameters:

  • interval_id (Fixnum)


66
67
68
69
70
71
72
73
74
75
76
# File 'lib/pulse-meter/sensor/timeline.rb', line 66

def reduce(interval_id)
  interval_raw_data_key = raw_data_key(interval_id)
  return unless redis.exists(interval_raw_data_key)
  value = summarize(interval_raw_data_key)
  interval_data_key = data_key(interval_id)
  multi do
    redis.del(interval_raw_data_key)
    redis.set(interval_data_key, value)
    redis.expire(interval_data_key, ttl)
  end
end

#reduce_all_rawObject

Reduces data in all raw interval



79
80
81
82
83
84
85
86
# File 'lib/pulse-meter/sensor/timeline.rb', line 79

def reduce_all_raw
  min_time = Time.now - reduce_delay - interval
  redis.keys(raw_data_key('*')).each do |key|
    interval_id = key.split(':').last
    next if Time.at(interval_id.to_i) > min_time
    reduce(interval_id)
  end
end

#summarize(key) ⇒ Object

This method is abstract.

Summarizes all event within interval to a single value

Parameters:

  • key (Fixnum)

    interval_id



173
174
175
176
# File 'lib/pulse-meter/sensor/timeline.rb', line 173

def summarize(key)
  # simple
  redis.get(key)
end

#timeline(time_ago) ⇒ Array<SensorData>

Returts sensor data within some last seconds

Parameters:

  • time_ago (Fixnum)

    interval length in seconds

Returns:

Raises:

  • ArgumentError if argumets are not valid time objects



98
99
100
101
102
# File 'lib/pulse-meter/sensor/timeline.rb', line 98

def timeline(time_ago)
  raise ArgumentError unless time_ago.respond_to?(:to_i) && time_ago.to_i > 0
  now = Time.now
  timeline_within(now - time_ago.to_i, now)
end

#timeline_within(from, till) ⇒ Array<SensorData>

Returts sensor data within given time

Parameters:

  • from (Time)

    lower bound

  • till (Time)

    upper bound

Returns:

Raises:

  • ArgumentError if argumets are not valid time objects



109
110
111
112
113
114
115
116
117
118
119
# File 'lib/pulse-meter/sensor/timeline.rb', line 109

def timeline_within(from, till)
  raise ArgumentError unless from.kind_of?(Time) && till.kind_of?(Time)
  start_time, end_time = from.to_i, till.to_i
  current_interval_id = get_interval_id(start_time) + interval
  res = []
  while current_interval_id < end_time
    res << get_timeline_value(current_interval_id)
    current_interval_id += interval
  end
  res
end