Module: SimpleMetrics::Importer

Extended by:
Importer
Included in:
Importer
Defined in:
lib/simple_metrics/importer.rb

Instance Method Summary collapse

Instance Method Details

#aggregate_coarse_buckets(dp) ⇒ Object



32
33
34
35
36
37
38
39
40
# File 'lib/simple_metrics/importer.rb', line 32

def aggregate_coarse_buckets(dp)
  coarse_buckets.each do |bucket|
    if existing_dp = bucket.find_data_point_at_ts(dp.ts, dp.name)
      bucket.update(existing_dp.combine(dp), existing_dp.ts)
    else
      bucket.save(dp, dp.ts)
    end
  end
end

#flush_data_points(data_points, ts = nil) ⇒ Object



17
18
19
20
21
22
23
24
25
26
27
28
29
30
# File 'lib/simple_metrics/importer.rb', line 17

def flush_data_points(data_points, ts = nil)
  return if data_points.empty?
  SimpleMetrics.logger.info "#{Time.now} Flushing #{data_points.count} to MongoDB"

  ts   ||= Time.now.utc.to_i
  bucket = Bucket.first

  group_by_name(data_points) do |name, dps|
    dp = DataPoint.aggregate_values(dps)
    bucket.save(dp, ts)
    update_metric(dp, dps.size)
    aggregate_coarse_buckets(dp)  
  end
end

#flush_raw(data) ⇒ Object



5
6
7
8
9
10
11
12
13
14
15
# File 'lib/simple_metrics/importer.rb', line 5

def flush_raw(data)      
  data_points = []
  data.each do |str|
    begin
      data_points << DataPoint.parse(str)
    rescue DataPoint::ParserError => e
      SimpleMetrics.logger.debug "Invalid Data skipped: #{str}, #{e}"
    end
  end
  flush_data_points(data_points, Time.now.utc.to_i)
end