Class: Aggregator

Inherits:
Object
  • Object
show all
Defined in:
lib/aggregator.rb

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initializeAggregator

Returns a new instance of Aggregator.


65
66
67
68
69
70
71
72
73
74
75
76
# File 'lib/aggregator.rb', line 65

def initialize
  @rules = []
  @index = true
  @optimize = true
  @excludes = [ 'interfaces', 'routers', 'pruned' ]
  @runlimit = nil
  @reaggregate_interval = 1.month
  @database = nil
  @dry_run = false
  @verbose = false
  @always_gauge = []
end

Instance Attribute Details

#always_gaugeObject

Table names that should always be considered gauge, regardless of autodetection.


56
57
58
# File 'lib/aggregator.rb', line 56

def always_gauge
  @always_gauge
end

#databaseObject

How to access the RTG database. Accepts a hash containing connection parameters.

database = { :host => "localhost", :user => "aggregator", :password => "abc123", :database => "rtg" }

46
47
48
# File 'lib/aggregator.rb', line 46

def database
  @database
end

#dry_runObject

Whether to do any altering operations on the database or not. If set to false, nothing will be done and all altering SQL queries will be echoed to stdout instead. This might fail, if for example the pruned table doesn't exist.

dry_run = false

51
52
53
# File 'lib/aggregator.rb', line 51

def dry_run
  @dry_run
end

#excludesObject

List of tables to exclude. Accepts literal strings (exakt match) and regular expressions. By default certain non time-series tables of RTG. such as interfaces and routers are excluded. If you have custom tables that should not be touched by aggregator, add them here.

excludes << "custom_table"  # Don't touch "custom_table"
excludes << /^dlink/        # Don't touch tables whose name starts with "dlink"

31
32
33
# File 'lib/aggregator.rb', line 31

def excludes
  @excludes
end

#indexObject

Whether or not to index tables before optimization. If this is set to true (default), indexes will be created, if they are missing, on dtime and id. This is recommended for acceptable aggregation performance.

index = true

20
21
22
# File 'lib/aggregator.rb', line 20

def index
  @index
end

#optimizeObject

Whether or not to run OPTIMIZE TABLE after aggregation. Defaults to true.

optimize = true

24
25
26
# File 'lib/aggregator.rb', line 24

def optimize
  @optimize
end

#reaggregate_intervalObject

How ofter to reaggregate a table. Defaults to one month.

reaggregate_interval = 1.month

43
44
45
# File 'lib/aggregator.rb', line 43

def reaggregate_interval
  @reaggregate_interval
end

#rulesObject

List of all currently active rules.


15
16
17
# File 'lib/aggregator.rb', line 15

def rules
  @rules
end

#runlimitObject

Limit run time of aggregator to a certain time span. This may be useful if aggregator is begin run from cron at a certain interval, to avoid overlapping processes. Set to nil (default) to no limit the run time. Note that aggregator always completes a table it has started processing, so the specified runtime might get exceeded.

runlimit = nil         # No limit
runlimit = 50.minutes # Almost guaranteed to be done in an hour.

40
41
42
# File 'lib/aggregator.rb', line 40

def runlimit
  @runlimit
end

#verbose=(value) ⇒ Object

Whether to print short informational messages to stdout during processing.

verbose = false

54
55
56
# File 'lib/aggregator.rb', line 54

def verbose=(value)
  @verbose = value
end

Class Method Details

.aggregate {|aggregator| ... } ⇒ Object

Convenience method to run aggregation. Passes an aggregator instance to the block for configuration, and the runs aggregation.

Yields:

  • (aggregator)

59
60
61
62
63
# File 'lib/aggregator.rb', line 59

def self.aggregate
  aggregator = Aggregator.new
  yield(aggregator)
  aggregator.run
end

Instance Method Details

#runObject

Start the processing.


85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
# File 'lib/aggregator.rb', line 85

def run
  to_process = tables
  processing_limit_time = Time.new + runlimit
  while to_process.length > 0 && (runlimit.nil? || Time.new < processing_limit_time)
    table = to_process.delete_at(rand(to_process.length))
    if needs_pruning?(table)
      @deletes = 0
      @inserts = 0
      @delete_qs = 0
      @insert_qs = 0
      verbose "Looking at #{table}"

      table_rules = rules_for(table)
      create_indexes(table)

      prev_rule_end_time = nil
      table_rules.each do |rule|
        end_time = rule[:age].ago
        interval = rule[:reduce]
        start_time = prev_rule_end_time || 0

        if rule.key? :drop
          # Drop data older than end_time
          drop_older_db(table, end_time)
        elsif rule.key? :reduce
          # Reduce data older than start_time to a lower precision (aggregate).
          # Aggregate separately for each ID in the table
          ids(table).each do |id|
            aggregate_db(table, id, start_time, end_time, interval)
          end
        end
        prev_rule_end_time = end_time
      end

      if @inserts > 0
        verbose "  Inserted #{@inserts} rows (individually)."
      end

      if @deletes > 0
        verbose "  Deleted #{@deletes} rows in #{@delete_qs} queries (about #{(@deletes / @delete_qs).to_i} rows/q)"
      end

      optimize_table(table)
      set_pruned(table)
    end
  end
end