Class: Statsd::Mongo
- Inherits:
-
Object
- Object
- Statsd::Mongo
- Defined in:
- lib/statsd/mongo.rb
Class Attribute Summary collapse
-
.database ⇒ Object
Returns the value of attribute database.
-
.flush_interval ⇒ Object
Returns the value of attribute flush_interval.
-
.hostname ⇒ Object
Returns the value of attribute hostname.
-
.retentions ⇒ Object
Returns the value of attribute retentions.
Class Method Summary collapse
-
.aggregate(current_bucket) ⇒ Object
For each coarse granularity of retention Look up the previous bucket If there is no data, aggregate the finest Fill it if empty TODO consider doing this inside Mongo with M/R.
- .flush_stats(counters, timers) ⇒ Object
Class Attribute Details
.database ⇒ Object
Returns the value of attribute database.
6 7 8 |
# File 'lib/statsd/mongo.rb', line 6 def database @database end |
.flush_interval ⇒ Object
Returns the value of attribute flush_interval.
6 7 8 |
# File 'lib/statsd/mongo.rb', line 6 def flush_interval @flush_interval end |
.hostname ⇒ Object
Returns the value of attribute hostname.
6 7 8 |
# File 'lib/statsd/mongo.rb', line 6 def hostname @hostname end |
.retentions ⇒ Object
Returns the value of attribute retentions.
6 7 8 |
# File 'lib/statsd/mongo.rb', line 6 def retentions @retentions end |
Class Method Details
.aggregate(current_bucket) ⇒ Object
For each coarse granularity of retention
Look up the previous bucket
If there is no data, aggregate the finest Fill it if empty
TODO consider doing this inside Mongo with M/R
86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 |
# File 'lib/statsd/mongo.rb', line 86 def self.aggregate(current_bucket) db = ::Mongo::Connection.new(hostname).db(database) retentions.sort_by! {|r| r['seconds']} docs = [] fine_stats_collection = db.collection(retentions.first['name']) # Use the finest granularity for now retentions[1..-1].each_with_index do |retention,index| # fine_stats_collection = db.collection(retentions[index]['name']) coarse_stats_collection = db.collection(retention['name']) puts "Aggregating #{retention['name']}" step = retention['seconds'] current_coarse_bucket = current_bucket / step * step - step previous_coarse_bucket = current_coarse_bucket - step puts "#{Time.at(previous_coarse_bucket)}..#{Time.at(current_coarse_bucket)}" # Look up previous bucket if coarse_stats_collection.find({:ts => previous_coarse_bucket}).count == 0 # Aggregate print '.' stats_to_aggregate = fine_stats_collection.find( {:ts => {"$gte" => previous_coarse_bucket, "$lt" => current_coarse_bucket}}) rows = stats_to_aggregate.to_a count = rows.count rows.group_by {|r| r["stat"] }.each_pair do |name,stats| case stats.first['type'] when 'timer' mean = stats.collect {|stat| stat['values']['mean'] }.inject( 0 ) { |s,x| s+x } / stats.count max = stats.collect {|stat| stat['values']['max'] }.max min = stats.collect {|stat| stat['values']['min'] }.min upper_key = stats.first['values'].keys.find{|k| k =~ /upper_/} max_at_threshold = stats.collect {|stat| stat['values'][upper_key] }.max total_stats = stats.collect {|stat| stat['values']['count'] }.inject( 0 ) { |s,x| s+x } doc = { :stat => name, :values => { :mean => mean, :max => max, :min => min, upper_key.to_sym => max_at_threshold, :count => total_stats }, :type => "timer", :ts => previous_coarse_bucket } when 'counter' doc = {:stat => name, :value => stats.collect {|stat| stat['value'] }.inject( 0 ) { |s,x| s+x }, :ts => previous_coarse_bucket, :type => "counter" } else raise "unknown type #{stats.first['type']}" end docs.push(doc) end coarse_stats_collection.insert(docs) unless docs.empty? end end end |
.flush_stats(counters, timers) ⇒ Object
9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 |
# File 'lib/statsd/mongo.rb', line 9 def self.flush_stats(counters, timers) raise 'Invalid retention config' if retentions.empty? print "#{Time.now} Flushing #{counters.count} counters and #{timers.count} timers to MongoDB" stat_string = '' time = ::Benchmark.realtime do docs = [] ts = Time.now.to_i num_stats = 0 retention = retentions.first # always write at the fineset granularity ts_bucket = ts / retention['seconds'].to_i * retention['seconds'].to_i # connect to store db = ::Mongo::Connection.new(hostname).db(database) coll = db.collection(retention['name']) # store counters counters.each_pair do |key,value| doc = {:stat => key, :value => value, :ts => ts_bucket, :type => "counter" } docs.push(doc) counters[key] = 0 num_stats += 1 end # store timers timers.each_pair do |key, values| if (values.length > 0) pct_threshold = 90 values.sort! count = values.count min = values.first max = values.last mean = min max_at_threshold = max if (count > 1) # strip off the top 100-threshold threshold_index = (((100 - pct_threshold) / 100.0) * count).round values = values[0..-threshold_index] max_at_threshold = values.last # average the remaining timings sum = values.inject( 0 ) { |s,x| s+x } mean = sum / values.count end timers[key] = [] # Flush Values to Store doc = { :stat => key, :values => { :mean => mean, :max => max, :min => min, "upper_#{pct_threshold}".to_sym => max_at_threshold, :count => count }, :type => "timer", :ts => ts_bucket } docs.push(doc) num_stats += 1 end end coll.insert(docs) aggregate(ts_bucket) end puts "complete. (#{time.round(3)}s)" end |