Class: UState::Aggregator

Inherits:
Object
  • Object
show all
Defined in:
lib/ustate/aggregator.rb

Constant Summary collapse

INTERVAL =

Combines states periodically.

1

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(index, opts = {}) ⇒ Aggregator

Returns a new instance of Aggregator.



9
10
11
12
13
14
15
16
17
# File 'lib/ustate/aggregator.rb', line 9

def initialize(index, opts = {})
  @index = index

  @folds = {}
  @interval = opts[:interval] || INTERVAL
  @server = opts[:server]

  start
end

Instance Attribute Details

#foldsObject

Returns the value of attribute folds.



8
9
10
# File 'lib/ustate/aggregator.rb', line 8

def folds
  @folds
end

#intervalObject

Returns the value of attribute interval.



7
8
9
# File 'lib/ustate/aggregator.rb', line 7

def interval
  @interval
end

Instance Method Details

#average(query, *a) ⇒ Object

Combines states matching query with State.average



20
21
22
23
24
# File 'lib/ustate/aggregator.rb', line 20

def average(query, *a)
  fold query do |states|
    State.average states, *a
  end
end

#average_over_hosts(query) ⇒ Object

Average states with the same service across varying hosts. Useful when you don’t know the exact services you’d like to fold. The resulting host for each folded state will be nil.



29
30
31
32
33
34
35
# File 'lib/ustate/aggregator.rb', line 29

def average_over_hosts(query)
  fold query do |states|
    State.partition(states, :service).values.map do |slice|
      State.sum slice, {host: nil}
    end
  end
end

#fold(query, &block) ⇒ Object

Combines states matching query with the given block. The block receives an array of states which presently match.

Example:

fold 'service = "api % reqs/sec"' do |states|
  states.inject(State.new(service: 'api reqs/sec')) do |combined, state|
    combined.metric_f += state.metric_f
    combined
  end
end


47
48
49
50
51
52
53
# File 'lib/ustate/aggregator.rb', line 47

def fold(query, &block)
  @folds[block] = if existing = @folds[block]
    "(#{existing}) or (#{q})"
  else
    query
  end
end

#startObject

Polls index for states matching each fold, applies fold, and inserts into index.



57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
# File 'lib/ustate/aggregator.rb', line 57

def start
  @runner = Thread.new do
    loop do
      begin
        interval = (@interval.to_f / @folds.size) rescue @interval
        @folds.each do |f, query|
          matching = @index.query(Query.new(string: query))
          unless matching.empty?
            case combined = f[matching]
            when State
              @index << combined
            when Array
              combined.each do |s|
                @index << s
              end
            end
          end
          sleep interval
        end
      rescue Exception => e
        @server.log.error e
        sleep 1
      end
    end
  end
end

#sum(query, *a) ⇒ Object

Combines states matching query with State.sum



85
86
87
88
89
# File 'lib/ustate/aggregator.rb', line 85

def sum(query, *a)
  fold query do |states|
    State.sum states, *a
  end
end

#sum_over_hosts(query) ⇒ Object

Sum states with the same service across varying hosts. Useful when you don’t know the exact services you’d like to fold.



93
94
95
96
97
98
99
# File 'lib/ustate/aggregator.rb', line 93

def sum_over_hosts(query)
  fold query do |states|
    State.partition(states, :service).values.map do |slice|
      State.sum slice, {host: nil}
    end
  end
end