Class: StatsdServer

Inherits:
Object
  • Object
show all
Defined in:
lib/statsdserver.rb,
lib/statsdserver/math.rb,
lib/statsdserver/stats.rb,
lib/statsdserver/proto/v1.rb,
lib/statsdserver/input/udp.rb,
lib/statsdserver/input/zeromq.rb,
lib/statsdserver/proto/parseerror.rb

Defined Under Namespace

Modules: Math Classes: Input, Output, Proto, Stats

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(opts, input_config, output_config) ⇒ StatsdServer

Returns a new instance of StatsdServer.



20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
# File 'lib/statsdserver.rb', line 20

def initialize(opts, input_config, output_config)
  @stats = StatsdServer::Stats.new
  @logger = Logger.new(STDERR)
  @logger.progname = File.basename($0)

  @opts = {
    :bind => "127.0.0.1",
    :port => 8125,
    :percentile => 90,
    :flush_interval => 30,
    :prefix => "stats",
    :preserve_counters => "true",
    :timer_names_before_suffix => "true",
  }.merge(opts)
  @input_config = input_config
  @output_config = output_config

  # argument checking
  [:port, :percentile, :flush_interval].each do |key|
    begin
      @opts[key] = Float(@opts[key])
    rescue
      raise "#{key}: #{@opts[key].inspect}: must be a valid number"
    end
  end
end

Instance Attribute Details

#loggerObject

Returns the value of attribute logger.



16
17
18
# File 'lib/statsdserver.rb', line 16

def logger
  @logger
end

#statsObject

Returns the value of attribute stats.



17
18
19
# File 'lib/statsdserver.rb', line 17

def stats
  @stats
end

Instance Method Details

#carbon_update_strObject



164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
# File 'lib/statsdserver.rb', line 164

def carbon_update_str
  updates = []
  now = Time.now.to_i

  timers = {}
  gauges = {}
  counters = {}

  @stats.timers.keys.each do |k|
    timers[k] = @stats.timers.delete(k)
  end

  @stats.gauges.keys.each do |k|
    #gauges[k] = @stats.gauges.delete(k)
    gauges[k] = stats.gauges[k]
  end

  @stats.counters.keys.each do |k|
    counters[k] = @stats.counters.delete(k)
  end

  if @opts[:preserve_counters] == "true"
    # Keep sending a 0 for counters (even if we don't get updates)
    counters.keys.each do |k|
      @stats.counters[k] ||= 0    # Keep sending a 0 if we don't get updates
    end
  end

  timers.each do |key, values|
    next if values.length == 0
    summary = ::StatsdServer::Math.summarize(values, @opts)
    if @opts[:timer_names_before_suffix] == "true"
      updates << [metric_name("timers.#{key}.mean"),
                  summary[:mean], now].join(" ")
      updates << [metric_name("timers.#{key}.upper"),
                  summary[:max], now].join(" ")
      updates << [metric_name("timers.#{key}.lower"),
                  summary[:min], now].join(" ")
      updates << [metric_name("timers.#{key}.count"),
                  values.length, now].join(" ")
      updates << [metric_name("timers.#{key}.upper_#{@opts[:percentile].to_i}"),
                  summary[:max_at_threshold], now].join(" ")
    else
      updates << [metric_name("timers.#{key}") + ".mean",
                  summary[:mean], now].join(" ")
      updates << [metric_name("timers.#{key}") + ".upper",
                  summary[:max], now].join(" ")
      updates << [metric_name("timers.#{key}") + ".lower",
                  summary[:min], now].join(" ")
      updates << [metric_name("timers.#{key}") + ".count",
                  values.length, now].join(" ")
      updates << [metric_name("timers.#{key}") + ".upper_#{@opts[:percentile].to_i}",
                  summary[:max_at_threshold], now].join(" ")
    end
  end # timers.each

  counters.each do |key, value|
    updates << [metric_name(key),
                value / @opts[:flush_interval],
                now].join(" ")
  end # counters.each

  gauges.each do |key, value|
    updates << [metric_name(key),
                value,
                now].join(" ")
  end # gauges.each

  return updates.length == 0 ? nil : updates.join("\n") + "\n"
end

#flushObject



236
237
238
239
240
241
242
243
244
245
246
247
248
# File 'lib/statsdserver.rb', line 236

def flush
  s = carbon_update_str
  return unless s

  if @outputs.nil? or @outputs.length == 0
    @logger.warn("no outputs configured, can't flush data")
    return
  end

  @outputs.each do |output|
    output.send(s)
  end
end

#runObject



48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
# File 'lib/statsdserver.rb', line 48

def run
  # initialize outputs
  @outputs = []
  @output_config.each do |output, config|
    klass = StatsdServer::Output.const_get(output.capitalize)
    if klass.nil?
      @logger.fatal("unknown output #{output.inspect}")
      exit EX_DATAERR
    end
    @outputs << klass.new(config)
  end # @output_config.each

  # start inputs
  @input_config.each do |input, config|
    case input
    when "udp"
      EM.open_datagram_socket(config["bind"], config["port"].to_i,
                              Input::Udp) do |s|
        s.logger = @logger
        s.stats = @stats
      end # EM.open_datagram_socket
    when "zeromq"
      s = Input::ZeroMQ.new
      s.logger = @logger
      s.stats = @stats
      $ctx = EM::ZeroMQ::Context.new(1)
      sock = $ctx.socket(ZMQ::PULL, s)
      sock.setsockopt(ZMQ::HWM, 100)
      sock.bind(config["bind"])
    else
      @logger.fatal("unknown input #{input.inspect}")
      exit EX_DATAERR
    end # case input
  end # @inputs.each

  # start flusher
  Thread.abort_on_exception = true
  @flusher = Thread.new do
    while sleep(@opts[:flush_interval])
      begin
        flush
      rescue => e
        @logger.warn("trouble flushing: #{$!}")
        @logger.debug(e.backtrace.join("\n"))
      end
    end
  end
end