Class: GraphiteAPI::Buffer

Inherits:
Object
  • Object
show all
Defined in:
lib/graphite-api/buffer.rb

Constant Summary collapse

IGNORE =
["\r"]
END_OF_STREAM =
"\n"
VALID_MESSAGE =

Matches the metric name (string with dots and dashes), at least one space, the metric value (int or float), at least one space and finally the metric timestamp (int)

/^[\w.-]+ +\d+(?:\.\d+)? +\d+$/
AGGREGATORS =
{
  sum: ->(*args) { args.reduce(0) { |sum, x| sum + x } },
  avg: ->(*args) { args.reduce(0) { |sum, x| sum + x } / [args.length, 1].max },
  replace: ->(*args) { args.last },
}

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(options, timers = false) ⇒ Buffer

Returns a new instance of Buffer.



38
39
40
41
42
43
# File 'lib/graphite-api/buffer.rb', line 38

def initialize options, timers=false
  @options = options
  @queue = Queue.new
  @streamer = Hash.new {|h,k| h[k] = ""}
  @cache = Cache::Memory.new(options, timers) if options[:cache]
end

Instance Attribute Details

#cacheObject (readonly)

Returns the value of attribute cache.



45
46
47
# File 'lib/graphite-api/buffer.rb', line 45

def cache
  @cache
end

#optionsObject (readonly)

Returns the value of attribute options.



45
46
47
# File 'lib/graphite-api/buffer.rb', line 45

def options
  @options
end

#queueObject (readonly)

Returns the value of attribute queue.



45
46
47
# File 'lib/graphite-api/buffer.rb', line 45

def queue
  @queue
end

#streamerObject (readonly)

Returns the value of attribute streamer.



45
46
47
# File 'lib/graphite-api/buffer.rb', line 45

def streamer
  @streamer
end

Instance Method Details

#inspectObject



98
99
100
101
# File 'lib/graphite-api/buffer.rb', line 98

def inspect
  "#<GraphiteAPI::Buffer:%s @quque#size=%s @streamer=%s>" %
    [ object_id, queue.size, streamer]
end

#new_records?Boolean

Returns:

  • (Boolean)


103
104
105
# File 'lib/graphite-api/buffer.rb', line 103

def new_records?
  !queue.empty?
end

#pull(format = nil) ⇒ Object



73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
# File 'lib/graphite-api/buffer.rb', line 73

def pull format = nil
  data = Hash.new { |h,time| h[time] = Hash.new { |h2,metric| h2[metric] = cache_get(time, metric) } }
  aggregation_methods = Hash.new

  counter = 0
  while new_records? and (counter += 1) < 1_000_000
    metrics, time, method_name = queue.pop.values_at(:metric, :time, :aggregation_method)

    normalized_time = normalize_time(time, options[:slice])
    metrics.each do |metric, value|
      aggregation_methods[metric] = method_name || options[:default_aggregation_method]
      data[normalized_time][metric].push value.to_f
      cache_set(normalized_time, metric, data[normalized_time][metric])
    end
  end

  data.map do |time, metrics|
    metrics.map do |metric, raw_values|
      value = AGGREGATORS[aggregation_methods[metric]].call(*raw_values)
      results = ["#{prefix}#{metric}",("%f"%value).to_f, time]
      format == :string ? results.join(" ") : results
    end
  end.flatten(1)
end

#push(obj) ⇒ Object Also known as: <<

Add records to buffer push(=> {‘a’ => 10,:time => Time.now,:aggregation_method => :sum})



65
66
67
68
69
# File 'lib/graphite-api/buffer.rb', line 65

def push obj
  Logger.debug [:buffer,:add, obj]
  queue.push obj
  nil
end

#stream(message, client_id = nil) ⇒ Object

this method isn’t thread safe use #push for multiple threads support



49
50
51
52
53
54
55
56
57
58
59
60
61
# File 'lib/graphite-api/buffer.rb', line 49

def stream message, client_id = nil
  message.gsub(/\t/,' ').each_char do |char|
    next if invalid_char? char
    streamer[client_id] += char

    if closed_stream? streamer[client_id]
      if streamer[client_id] =~ VALID_MESSAGE
        push stream_message_to_obj streamer[client_id]
      end
      streamer.delete client_id
    end
  end
end