Class: Statsd::Mongo

Inherits:
Object
  • Object
show all
Defined in:
lib/statsd/mongo.rb

Class Attribute Summary collapse

Class Method Summary collapse

Class Attribute Details

.databaseObject

Returns the value of attribute database.



6
7
8
# File 'lib/statsd/mongo.rb', line 6

def database
  @database
end

.flush_intervalObject

Returns the value of attribute flush_interval.



6
7
8
# File 'lib/statsd/mongo.rb', line 6

def flush_interval
  @flush_interval
end

.hostnameObject

Returns the value of attribute hostname.



6
7
8
# File 'lib/statsd/mongo.rb', line 6

def hostname
  @hostname
end

.retentionsObject

Returns the value of attribute retentions.



6
7
8
# File 'lib/statsd/mongo.rb', line 6

def retentions
  @retentions
end

Class Method Details

.aggregate(current_bucket) ⇒ Object

For each coarse granularity of retention

Look up the previous bucket 
  If there is no data, aggregate the finest Fill it if empty

TODO consider doing this inside Mongo with M/R



86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
# File 'lib/statsd/mongo.rb', line 86

def self.aggregate(current_bucket)
  db = ::Mongo::Connection.new(hostname).db(database)
  retentions.sort_by! {|r| r['seconds']}
  docs = []
  fine_stats_collection = db.collection(retentions.first['name']) # Use the finest granularity for now
  retentions[1..-1].each_with_index do |retention,index|
    # fine_stats_collection = db.collection(retentions[index]['name'])
    coarse_stats_collection = db.collection(retention['name'])
    puts "Aggregating #{retention['name']}"
    step = retention['seconds']
    current_coarse_bucket = current_bucket / step * step - step
    previous_coarse_bucket = current_coarse_bucket - step
    puts "#{Time.at(previous_coarse_bucket)}..#{Time.at(current_coarse_bucket)}"
    # Look up previous bucket
    if coarse_stats_collection.find({:ts => previous_coarse_bucket}).count == 0
      # Aggregate
      print '.'
      stats_to_aggregate = fine_stats_collection.find(
        {:ts => {"$gte" => previous_coarse_bucket, "$lt" => current_coarse_bucket}})
      rows = stats_to_aggregate.to_a
      count = rows.count
      rows.group_by {|r| r["stat"] }.each_pair do |name,stats|
        case stats.first['type']
        when 'timer' 
          mean = stats.collect {|stat| stat['values']['mean'] }.inject( 0 ) { |s,x| s+x } / stats.count
          max  = stats.collect {|stat| stat['values']['max'] }.max
          min  = stats.collect {|stat| stat['values']['min'] }.min
          upper_key = stats.first['values'].keys.find{|k| k =~ /upper_/}
          max_at_threshold = stats.collect {|stat| stat['values'][upper_key] }.max
          total_stats = stats.collect {|stat| stat['values']['count'] }.inject( 0 ) { |s,x| s+x }            
          doc = { :stat => name, 
            :values => {
              :mean => mean,
              :max => max,
              :min => min,
              upper_key.to_sym => max_at_threshold,
              :count => total_stats
            },
            :type => "timer",
            :ts => previous_coarse_bucket
          }
        when 'counter'  
          doc = {:stat => name, 
            :value => stats.collect {|stat| stat['value'] }.inject( 0 ) { |s,x| s+x }, 
            :ts => previous_coarse_bucket, 
            :type => "counter"
          }
        else
          raise "unknown type #{stats.first['type']}"
        end
        docs.push(doc) 
      end
      coarse_stats_collection.insert(docs) unless docs.empty?
    end
  end
  
end

.flush_stats(counters, timers) ⇒ Object



9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
# File 'lib/statsd/mongo.rb', line 9

def self.flush_stats(counters, timers)
  raise 'Invalid retention config' if retentions.empty?
  print "#{Time.now} Flushing #{counters.count} counters and #{timers.count} timers to MongoDB"
  stat_string = ''
  time = ::Benchmark.realtime do
    docs = []
    ts = Time.now.to_i
    num_stats = 0        
    retention = retentions.first # always write at the fineset granularity        
    ts_bucket = ts / retention['seconds'].to_i * retention['seconds'].to_i
    
    # connect to store
    db = ::Mongo::Connection.new(hostname).db(database)
    coll = db.collection(retention['name'])

    # store counters
    counters.each_pair do |key,value|
      doc = {:stat => key, :value => value, :ts => ts_bucket, :type => "counter" }
      docs.push(doc)
      counters[key] = 0

      num_stats += 1
    end

    # store timers
    timers.each_pair do |key, values|
      if (values.length > 0) 
        pct_threshold = 90
        values.sort!
        count = values.count
        min = values.first
        max = values.last

        mean = min
        max_at_threshold = max

        if (count > 1)
          # strip off the top 100-threshold
          threshold_index = (((100 - pct_threshold) / 100.0) * count).round
          values = values[0..-threshold_index]
          max_at_threshold = values.last

          # average the remaining timings
          sum = values.inject( 0 ) { |s,x| s+x }
          mean = sum / values.count
        end

        timers[key] = []
      
        # Flush Values to Store
        doc = { :stat => key, 
          :values => {
            :mean => mean,
            :max => max,
            :min => min,
            "upper_#{pct_threshold}".to_sym => max_at_threshold,
            :count => count
          },
          :type => "timer",
          :ts => ts_bucket
        }
        docs.push(doc)
      
        num_stats += 1
      end
    end
    coll.insert(docs)
    
   aggregate(ts_bucket)
  end 
  puts "complete. (#{time.round(3)}s)"
end