Module: SimpleMetrics::Importer
Instance Method Summary collapse
- #aggregate_coarse_buckets(dp) ⇒ Object
- #flush_data_points(data_points, ts = nil) ⇒ Object
- #flush_raw(data) ⇒ Object
Instance Method Details
#aggregate_coarse_buckets(dp) ⇒ Object
32 33 34 35 36 37 38 39 40 |
# File 'lib/simple_metrics/importer.rb', line 32 def aggregate_coarse_buckets(dp) coarse_buckets.each do |bucket| if existing_dp = bucket.find_data_point_at_ts(dp.ts, dp.name) bucket.update(existing_dp.combine(dp), existing_dp.ts) else bucket.save(dp, dp.ts) end end end |
#flush_data_points(data_points, ts = nil) ⇒ Object
17 18 19 20 21 22 23 24 25 26 27 28 29 30 |
# File 'lib/simple_metrics/importer.rb', line 17 def flush_data_points(data_points, ts = nil) return if data_points.empty? SimpleMetrics.logger.info "#{Time.now} Flushing #{data_points.count} to MongoDB" ts ||= Time.now.utc.to_i bucket = Bucket.first group_by_name(data_points) do |name, dps| dp = DataPoint.aggregate_values(dps) bucket.save(dp, ts) update_metric(dp, dps.size) aggregate_coarse_buckets(dp) end end |
#flush_raw(data) ⇒ Object
5 6 7 8 9 10 11 12 13 14 15 |
# File 'lib/simple_metrics/importer.rb', line 5 def flush_raw(data) data_points = [] data.each do |str| begin data_points << DataPoint.parse(str) rescue DataPoint::ParserError => e SimpleMetrics.logger.debug "Invalid Data skipped: #{str}, #{e}" end end flush_data_points(data_points, Time.now.utc.to_i) end |