Class: Librato::Collector::Aggregator
- Inherits:
-
Object
- Object
- Librato::Collector::Aggregator
- Extended by:
- Forwardable
- Defined in:
- lib/librato/collector/aggregator.rb
Overview
maintains storage of timing and measurement type measurements
Instance Method Summary collapse
- #[](key) ⇒ Object
- #delete_all ⇒ Object
- #fetch(key, options = {}) ⇒ Object
-
#flush_to(queue) ⇒ Object
transfer all measurements to queue and reset internal status.
-
#initialize(options = {}) ⇒ Aggregator
constructor
A new instance of Aggregator.
- #measure(*args, &block) ⇒ Object (also: #timing)
Constructor Details
#initialize(options = {}) ⇒ Aggregator
Returns a new instance of Aggregator.
10 11 12 13 |
# File 'lib/librato/collector/aggregator.rb', line 10 def initialize(={}) @cache = Librato::Metrics::Aggregator.new(:prefix => [:prefix]) @lock = Mutex.new end |
Instance Method Details
#[](key) ⇒ Object
15 16 17 |
# File 'lib/librato/collector/aggregator.rb', line 15 def [](key) fetch(key) end |
#delete_all ⇒ Object
33 34 35 |
# File 'lib/librato/collector/aggregator.rb', line 33 def delete_all @lock.synchronize { @cache.clear } end |
#fetch(key, options = {}) ⇒ Object
19 20 21 22 23 24 25 26 27 28 29 30 31 |
# File 'lib/librato/collector/aggregator.rb', line 19 def fetch(key, ={}) return nil if @cache.empty? gauges = nil source = [:source] @lock.synchronize { gauges = @cache.queued[:gauges] } gauges.each do |metric| if metric[:name] == key.to_s return metric if !source && !metric[:source] return metric if source.to_s == metric[:source] end end nil end |
#flush_to(queue) ⇒ Object
transfer all measurements to queue and reset internal status
38 39 40 41 42 43 44 45 46 |
# File 'lib/librato/collector/aggregator.rb', line 38 def flush_to(queue) queued = nil @lock.synchronize do return if @cache.empty? queued = @cache.queued @cache.clear end queue.merge!(queued) if queued end |
#measure(*args, &block) ⇒ Object Also known as: timing
62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 |
# File 'lib/librato/collector/aggregator.rb', line 62 def measure(*args, &block) = {} event = args[0].to_s returned = nil # handle block or specified argument if block_given? start = Time.now returned = yield value = ((Time.now - start) * 1000.0).to_i elsif args[1] value = args[1] else raise "no value provided" end # detect options hash if present if args.length > 1 and args[-1].respond_to?(:each) = args[-1] end source = [:source] @lock.synchronize do if source @cache.add event => {:source => source, :value => value} else @cache.add event => value end end returned end |