Class: GraphiteAPI::Buffer
- Inherits:
-
Object
- Object
- GraphiteAPI::Buffer
- Defined in:
- lib/graphite-api/buffer.rb
Constant Summary collapse
- IGNORE =
["\r"]
- END_OF_STREAM =
"\n"- VALID_MESSAGE =
Matches the metric name (string with dots and dashes), at least one space, the metric value (int or float), at least one space and finally the metric timestamp (int)
/^[\w.-]+ +\d+(?:\.\d+)? +\d+$/- AGGREGATORS =
{ sum: ->(*args) { args.reduce(0) { |sum, x| sum + x } }, avg: ->(*args) { args.reduce(0) { |sum, x| sum + x } / [args.length, 1].max }, replace: ->(*args) { args.last }, }
Instance Attribute Summary collapse
-
#cache ⇒ Object
readonly
Returns the value of attribute cache.
-
#options ⇒ Object
readonly
Returns the value of attribute options.
-
#queue ⇒ Object
readonly
Returns the value of attribute queue.
-
#streamer ⇒ Object
readonly
Returns the value of attribute streamer.
Instance Method Summary collapse
-
#initialize(options, timers = false) ⇒ Buffer
constructor
A new instance of Buffer.
- #inspect ⇒ Object
- #new_records? ⇒ Boolean
- #pull(format = nil) ⇒ Object
-
#push(obj) ⇒ Object
(also: #<<)
Add records to buffer push(=> {‘a’ => 10,:time => Time.now,:aggregation_method => :sum}).
-
#stream(message, client_id = nil) ⇒ Object
this method isn’t thread safe use #push for multiple threads support.
Constructor Details
Instance Attribute Details
#cache ⇒ Object (readonly)
Returns the value of attribute cache.
45 46 47 |
# File 'lib/graphite-api/buffer.rb', line 45 def cache @cache end |
#options ⇒ Object (readonly)
Returns the value of attribute options.
45 46 47 |
# File 'lib/graphite-api/buffer.rb', line 45 def @options end |
#queue ⇒ Object (readonly)
Returns the value of attribute queue.
45 46 47 |
# File 'lib/graphite-api/buffer.rb', line 45 def queue @queue end |
#streamer ⇒ Object (readonly)
Returns the value of attribute streamer.
45 46 47 |
# File 'lib/graphite-api/buffer.rb', line 45 def streamer @streamer end |
Instance Method Details
#inspect ⇒ Object
98 99 100 101 |
# File 'lib/graphite-api/buffer.rb', line 98 def inspect "#<GraphiteAPI::Buffer:%s @quque#size=%s @streamer=%s>" % [ object_id, queue.size, streamer] end |
#new_records? ⇒ Boolean
103 104 105 |
# File 'lib/graphite-api/buffer.rb', line 103 def new_records? !queue.empty? end |
#pull(format = nil) ⇒ Object
73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 |
# File 'lib/graphite-api/buffer.rb', line 73 def pull format = nil data = Hash.new { |h,time| h[time] = Hash.new { |h2,metric| h2[metric] = cache_get(time, metric) } } aggregation_methods = Hash.new counter = 0 while new_records? and (counter += 1) < 1_000_000 metrics, time, method_name = queue.pop.values_at(:metric, :time, :aggregation_method) normalized_time = normalize_time(time, [:slice]) metrics.each do |metric, value| aggregation_methods[metric] = method_name || [:default_aggregation_method] data[normalized_time][metric].push value.to_f cache_set(normalized_time, metric, data[normalized_time][metric]) end end data.map do |time, metrics| metrics.map do |metric, raw_values| value = AGGREGATORS[aggregation_methods[metric]].call(*raw_values) results = ["#{prefix}#{metric}",("%f"%value).to_f, time] format == :string ? results.join(" ") : results end end.flatten(1) end |
#push(obj) ⇒ Object Also known as: <<
Add records to buffer push(=> {‘a’ => 10,:time => Time.now,:aggregation_method => :sum})
65 66 67 68 69 |
# File 'lib/graphite-api/buffer.rb', line 65 def push obj Logger.debug [:buffer,:add, obj] queue.push obj nil end |
#stream(message, client_id = nil) ⇒ Object
this method isn’t thread safe use #push for multiple threads support
49 50 51 52 53 54 55 56 57 58 59 60 61 |
# File 'lib/graphite-api/buffer.rb', line 49 def stream , client_id = nil .gsub(/\t/,' ').each_char do |char| next if invalid_char? char streamer[client_id] += char if closed_stream? streamer[client_id] if streamer[client_id] =~ VALID_MESSAGE push streamer[client_id] end streamer.delete client_id end end end |