Class: Librato::Collector::Aggregator

Inherits:
Object
  • Object
show all
Extended by:
Forwardable
Defined in:
lib/librato/collector/aggregator.rb

Overview

maintains storage of timing and measurement type measurements

Instance Method Summary collapse

Constructor Details

#initialize(options = {}) ⇒ Aggregator

Returns a new instance of Aggregator.



10
11
12
13
# File 'lib/librato/collector/aggregator.rb', line 10

def initialize(options={})
  @cache = Librato::Metrics::Aggregator.new(:prefix => options[:prefix])
  @lock = Mutex.new
end

Instance Method Details

#[](key) ⇒ Object



15
16
17
# File 'lib/librato/collector/aggregator.rb', line 15

def [](key)
  fetch(key)
end

#delete_allObject



33
34
35
# File 'lib/librato/collector/aggregator.rb', line 33

def delete_all
  @lock.synchronize { @cache.clear }
end

#fetch(key, options = {}) ⇒ Object



19
20
21
22
23
24
25
26
27
28
29
30
31
# File 'lib/librato/collector/aggregator.rb', line 19

def fetch(key, options={})
  return nil if @cache.empty?
  gauges = nil
  source = options[:source]
  @lock.synchronize { gauges = @cache.queued[:gauges] }
  gauges.each do |metric|
    if metric[:name] == key.to_s
      return metric if !source && !metric[:source]
      return metric if source.to_s == metric[:source]
    end
  end
  nil
end

#flush_to(queue) ⇒ Object

transfer all measurements to queue and reset internal status



38
39
40
41
42
43
44
45
46
# File 'lib/librato/collector/aggregator.rb', line 38

def flush_to(queue)
  queued = nil
  @lock.synchronize do
    return if @cache.empty?
    queued = @cache.queued
    @cache.clear
  end
  queue.merge!(queued) if queued
end

#measure(*args, &block) ⇒ Object Also known as: timing

Examples:

Simple measurement

measure 'sources_returned', sources.length

Simple timing in milliseconds

timing 'twitter.lookup', 2.31

Block-based timing

timing 'db.query' do
  do_my_query
end

Custom source

measure 'user.all_orders', user.order_count, :source => user.id


62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
# File 'lib/librato/collector/aggregator.rb', line 62

def measure(*args, &block)
  options = {}
  event = args[0].to_s
  returned = nil

  # handle block or specified argument
  if block_given?
    start = Time.now
    returned = yield
    value = ((Time.now - start) * 1000.0).to_i
  elsif args[1]
    value = args[1]
  else
    raise "no value provided"
  end

  # detect options hash if present
  if args.length > 1 and args[-1].respond_to?(:each)
    options = args[-1]
  end
  source = options[:source]

  @lock.synchronize do
    if source
      @cache.add event => {:source => source, :value => value}
    else
      @cache.add event => value
    end
  end
  returned
end