Class: UState::Graphite

Inherits:
Object
  • Object
show all
Defined in:
lib/ustate/graphite.rb

Constant Summary collapse

HOST =

Forwards states to Graphite.

'127.0.0.1'
PORT =
2003
INTERVAL =
5

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(index, opts = {}) ⇒ Graphite

Returns a new instance of Graphite.



13
14
15
16
17
18
19
20
21
22
23
# File 'lib/ustate/graphite.rb', line 13

def initialize(index, opts = {})
  @index = index
  @query = opts[:query]
  @host = opts[:host] || HOST
  @port = opts[:port] || PORT
  @server = opts[:server]
  @interval = opts[:interval] || INTERVAL
  @locket = Mutex.new

  start
end

Instance Attribute Details

#hostObject

Returns the value of attribute host.



9
10
11
# File 'lib/ustate/graphite.rb', line 9

def host
  @host
end

#intervalObject

Returns the value of attribute interval.



11
12
13
# File 'lib/ustate/graphite.rb', line 11

def interval
  @interval
end

#portObject

Returns the value of attribute port.



10
11
12
# File 'lib/ustate/graphite.rb', line 10

def port
  @port
end

#queryObject

Returns the value of attribute query.



8
9
10
# File 'lib/ustate/graphite.rb', line 8

def query
  @query
end

Instance Method Details

#connectObject



25
26
27
# File 'lib/ustate/graphite.rb', line 25

def connect
  @socket = TCPSocket.new(@host, @port)
end

#forward(state) ⇒ Object



37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
# File 'lib/ustate/graphite.rb', line 37

def forward(state)
  # Figure out what time to use.
  present = Time.now.to_i
  if (present - state.time) >= @interval
    time = present
  else
    time = state.time
  end

  # Construct message
  string = "#{path(state)} #{state.metric} #{time}"
  
  # Validate string
  if string["\n"]
    raise ArgumentError, "#{string} has a newline"
  end

  with_connection do |s|
    s.puts string
  end
end

#graph(q) ⇒ Object



29
30
31
32
33
34
35
# File 'lib/ustate/graphite.rb', line 29

def graph(q)
  if @query
    @query = "(#{@query}) or (#{q})"
  else
    @query = q
  end
end

#path(state) ⇒ Object

Formats a state into a Graphite metric path.



60
61
62
63
64
65
66
67
# File 'lib/ustate/graphite.rb', line 60

def path(state)
  if state.host
    host = state.host.split('.').reverse.join('.')
    "#{host}.#{state.service.gsub(' ', '.')}"
  else
    state.service.gsub(' ', '.')
  end
end

#startObject



69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
# File 'lib/ustate/graphite.rb', line 69

def start
  @runner = Thread.new do
    loop do
      begin
        @index.query(Query.new(string: @query)).each do |state|
          forward state
        end
        sleep @interval
      rescue Exception => e
        @server.log.warn e
        sleep 1
      end
    end
  end
end

#with_connectionObject



85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
# File 'lib/ustate/graphite.rb', line 85

def with_connection
  tries = 0
  @locket.synchronize do
    begin
      tries += 1
      yield (@socket or connect)
    rescue IOError => e
      raise if tries > 3
      connect and retry
    rescue Errno::EPIPE => e
      raise if tries > 3
      connect and retry
    rescue Errno::ECONNREFUSED => e
      raise if tries > 3
      connect and retry
    rescue Errno::ECONNRESET => e
      raise if tries > 3
      connect and retry
    end
  end
end