Class: PulseMeter::Sensor::Timeline Abstract

Inherits:
Base
  • Object
show all
Includes:
Mixins::Utils, TimelineReduce
Defined in:
lib/pulse-meter/sensor/timeline.rb

Overview

This class is abstract.

Represents timelined sensor: series of values, one value for each consequent time interval.

Constant Summary collapse

MAX_TIMESPAN_POINTS =
1000
DEFAULTS =

Default values for some sensor parameters

{
  :raw_data_ttl => 3600,
  :reduce_delay => 60,
}

Constants included from TimelineReduce

PulseMeter::Sensor::TimelineReduce::MAX_INTERVALS

Constants included from Mixins::Dumper

Mixins::Dumper::DUMP_REDIS_KEY

Instance Attribute Summary collapse

Attributes inherited from Base

#name, #redis

Instance Method Summary collapse

Methods included from TimelineReduce

#collect_ids_to_reduce, included, #reduce, #reduce_all_raw

Methods included from Mixins::Utils

#assert_array!, #assert_positive_integer!, #assert_ranged_float!, #camelize, #camelize_keys, #constantize, #each_subset, #parse_time, #subsets_of, #symbolize_keys, #titleize, #uniqid

Methods inherited from Base

#annotate, #annotation, #command_aggregator, #event

Methods included from Mixins::Dumper

included

Constructor Details

#initialize(name, options) ⇒ Timeline

Initializes sensor with given name and parameters

Parameters:

  • name (String)

    sensor name

  • options (Hash)

    a customizable set of options

Options Hash (options):

  • :interval (Fixnum)

    Rotation interval

  • :ttl (Fixnum)

    How long summarized data will be stored before expiration

  • :raw_data_ttl (Fixnum)

    How long unsummarized raw data will be stored before expiration

  • :reduce_delay (Fixnum)

    Delay between end of interval and summarization



35
36
37
38
39
40
41
# File 'lib/pulse-meter/sensor/timeline.rb', line 35

def initialize(name, options)
  @interval = assert_positive_integer!(options, :interval)
  @ttl = assert_positive_integer!(options, :ttl)
  @raw_data_ttl = assert_positive_integer!(options, :raw_data_ttl, DEFAULTS[:raw_data_ttl])
  @reduce_delay = assert_positive_integer!(options, :reduce_delay, DEFAULTS[:reduce_delay])
  super
end

Instance Attribute Details

#intervalFixnum (readonly)

Returns Rotation interval.

Returns:

  • (Fixnum)

    Rotation interval



21
22
23
# File 'lib/pulse-meter/sensor/timeline.rb', line 21

def interval
  @interval
end

#raw_data_ttlFixnum (readonly)

Returns How long unsummarized raw data will be stored before expiration.

Returns:

  • (Fixnum)

    How long unsummarized raw data will be stored before expiration



21
# File 'lib/pulse-meter/sensor/timeline.rb', line 21

attr_reader :interval, :ttl, :raw_data_ttl, :reduce_delay

#reduce_delayObject (readonly)

Returns the value of attribute reduce_delay.



21
# File 'lib/pulse-meter/sensor/timeline.rb', line 21

attr_reader :interval, :ttl, :raw_data_ttl, :reduce_delay

#ttlFixnum (readonly)

Returns How long summarized data will be stored before expiration.

Returns:

  • (Fixnum)

    How long summarized data will be stored before expiration



21
# File 'lib/pulse-meter/sensor/timeline.rb', line 21

attr_reader :interval, :ttl, :raw_data_ttl, :reduce_delay

Instance Method Details

#aggregate_event(key, value) ⇒ Object

This method is abstract.

Registeres event for current interval identified by key

Parameters:

  • key (Fixnum)

    interval id

  • value (Object)

    value to be aggregated



154
155
156
157
# File 'lib/pulse-meter/sensor/timeline.rb', line 154

def aggregate_event(key, value)
  # simple
  redis.set(key, value)
end

#cleanupObject

Clean up all sensor metadata and data



44
45
46
47
48
49
50
# File 'lib/pulse-meter/sensor/timeline.rb', line 44

def cleanup
  keys = redis.keys(raw_data_key('*')) + redis.keys(data_key('*'))
  multi do
    keys.each{|key| redis.del(key)}
  end
  super
end

#current_interval_idFixnum

Returns current interval id

Returns:

  • (Fixnum)


147
148
149
# File 'lib/pulse-meter/sensor/timeline.rb', line 147

def current_interval_id
  get_interval_id(Time.now)
end

#current_raw_data_keyObject

Returns Redis key by which raw data for current interval is stored



123
124
125
# File 'lib/pulse-meter/sensor/timeline.rb', line 123

def current_raw_data_key
  raw_data_key(current_interval_id)
end

#data_key(id) ⇒ Object

Returns Redis key by which summarized data for given interval is stored

Parameters:

  • id (Fixnum)

    interval id



135
136
137
# File 'lib/pulse-meter/sensor/timeline.rb', line 135

def data_key(id)
  "pulse_meter:data:#{name}:#{id}"
end

#deflate_safe(value) ⇒ Object

This method is abstract.

Deflates data taken from redis as string preserving nil values

Parameters:

  • value (String)

    raw data



168
169
170
171
172
# File 'lib/pulse-meter/sensor/timeline.rb', line 168

def deflate_safe(value)
  value.nil? ? nil : deflate(value)
rescue
  nil
end

#drop_within(from, till) ⇒ Object

Drops sensor data within given time

Parameters:

  • from (Time)

    lower bound

  • till (Time)

    upper bound

Raises:

  • ArgumentError if argumets are not valid time objects



109
110
111
112
113
114
115
116
117
118
119
120
# File 'lib/pulse-meter/sensor/timeline.rb', line 109

def drop_within(from, till)
  raise ArgumentError unless from.kind_of?(Time) && till.kind_of?(Time)
  start_time, end_time = from.to_i, till.to_i
  current_interval_id = get_interval_id(start_time) + interval
  keys = []
  while current_interval_id < end_time
    keys << data_key(current_interval_id)
    keys << raw_data_key(current_interval_id)
    current_interval_id += interval
  end
  keys.empty? ? 0 : redis.del(*keys)
end

#event_at(time, value = nil) ⇒ Object

Processes event from the past

Parameters:

  • time (Time)

    event time

  • value (defaults to: nil)

    event value



55
56
57
58
59
60
61
62
63
64
65
# File 'lib/pulse-meter/sensor/timeline.rb', line 55

def event_at(time, value = nil)
  multi do
    interval_id = get_interval_id(time)
    key = raw_data_key(interval_id)
    aggregate_event(key, value)
    command_aggregator.expire(key, raw_data_ttl)
  end
  true
rescue StandardError => e
  false
end

#get_interval_id(time) ⇒ Object

Returns interval id where given time is

Parameters:

  • time (Time)


141
142
143
# File 'lib/pulse-meter/sensor/timeline.rb', line 141

def get_interval_id(time)
  (time.to_i / interval) * interval
end

#get_raw_value(interval_id) ⇒ SensorData

Returns sensor data for given interval making in-memory summarization

and returns calculated value

Parameters:

  • interval_id (Fixnum)

Returns:



96
97
98
99
100
101
102
103
# File 'lib/pulse-meter/sensor/timeline.rb', line 96

def get_raw_value(interval_id)
  interval_raw_data_key = raw_data_key(interval_id)
  if redis.exists(interval_raw_data_key)
    sensor_data(interval_id, summarize(interval_raw_data_key))
  else
    sensor_data(interval_id, nil)
  end
end

#raw_data_key(id) ⇒ Object

Returns Redis key by which raw data for given interval is stored

Parameters:

  • id (Fixnum)

    interval id



129
130
131
# File 'lib/pulse-meter/sensor/timeline.rb', line 129

def raw_data_key(id)
  "pulse_meter:raw:#{name}:#{id}"
end

#summarize(key) ⇒ Object

This method is abstract.

Summarizes all event within interval to a single value

Parameters:

  • key (Fixnum)

    interval_id



161
162
163
164
# File 'lib/pulse-meter/sensor/timeline.rb', line 161

def summarize(key)
  # simple
  redis.get(key)
end

#timeline(time_ago) ⇒ Array<SensorData>

Returts sensor data within some last seconds

Parameters:

  • time_ago (Fixnum)

    interval length in seconds

Returns:

Raises:

  • ArgumentError if argumets are not valid time objects



71
72
73
74
75
# File 'lib/pulse-meter/sensor/timeline.rb', line 71

def timeline(time_ago)
  raise ArgumentError unless time_ago.respond_to?(:to_i) && time_ago.to_i > 0
  now = Time.now
  timeline_within(now - time_ago.to_i, now)
end

#timeline_within(from, till, skip_optimization = false) ⇒ Array<SensorData>

Returts sensor data within given time

Parameters:

  • from (Time)

    lower bound

  • till (Time)

    upper bound

  • skip_optimization (Boolean) (defaults to: false)

    must be set to true to skip interval optimization

Returns:

Raises:

  • ArgumentError if argumets are not valid time objects



83
84
85
86
87
88
89
90
# File 'lib/pulse-meter/sensor/timeline.rb', line 83

def timeline_within(from, till, skip_optimization = false)
  raise ArgumentError unless from.kind_of?(Time) && till.kind_of?(Time)
  start_time, end_time = from.to_i, till.to_i
  actual_interval = optimized_interval(start_time, end_time, skip_optimization)
  start_interval_id = get_interval_id(start_time) + actual_interval
  ids, values = fetch_reduced_interval_data(start_interval_id, actual_interval, end_time)
  zip_with_raw_data(ids, values)
end