Class: Wavefront::BatchWriter

Inherits:
Object
  • Object
show all
Includes:
Constants
Defined in:
lib/wavefront/batch_writer.rb

Overview

This class exists to facilitate sending of multiple data points to a Wavefront proxy. It sends points in native Wavefront format.

When initializing the instance you can define point tags which will apply to all points sent via that instance.

Though we provide methods to do it, it is the developer’s responsibility to open and close the socket to the proxy. Points are sent by calling the write() method.

The class keeps a count of the points the current instance has sent, dropped, and failed to send, in @summary. The socket is accessed through the instance variable @sock.

Constant Summary

Constants included from Constants

Constants::ALERT_FORMATS, Constants::DEFAULT_ALERT_FORMAT, Constants::DEFAULT_FORMAT, Constants::DEFAULT_HOST, Constants::DEFAULT_INFILE_FORMAT, Constants::DEFAULT_OBSOLETE_METRICS, Constants::DEFAULT_PERIOD_SECONDS, Constants::DEFAULT_PREFIX_LENGTH, Constants::DEFAULT_PROXY, Constants::DEFAULT_PROXY_PORT, Constants::DEFAULT_STRICT, Constants::EVENT_LEVELS, Constants::EVENT_STATE_DIR, Constants::FORMATS, Constants::GRANULARITIES

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(options = {}) ⇒ BatchWriter

Returns a new instance of BatchWriter.



31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
# File 'lib/wavefront/batch_writer.rb', line 31

def initialize(options = {})
  #
  # options is of the form:
  #
  # {
  #   tags:       a key-value hash of tags which will be applied to
  #               every  point
  #   proxy:      the address of the Wavefront proxy
  #   port:       the port of the Wavefront proxy
  #   noop:       if this is true, no proxy connection will be made,
  #               and instead of sending the points, they will
  #               be printed in Wavefront wire format.
  #   novalidate: if this is true, points will not be validated.
  #               This might make things go marginally quicker
  #               if you have done point validation higher up in
  #               the chain.
  #   verbose:    if this is true, many of the methods will report
  #               their progress.
  #   debug:      if this is true, debugging output will be
  #               printed.
  # }
  #
  defaults = {
    tags:       false,
    proxy:      DEFAULT_PROXY,
    port:       DEFAULT_PROXY_PORT,
    noop:       false,
    novalidate: false,
    verbose:    false,
    debug:      false,
  }

  @summary = { sent:     0,
               rejected: 0,
               unsent:   0,
             }
  @opts = setup_options(options, defaults)

  if opts[:tags]
    valid_tags?(opts[:tags])
    @global_tags = opts[:tags]
  end
end

Instance Attribute Details

#optsObject (readonly)

Returns the value of attribute opts.



28
29
30
# File 'lib/wavefront/batch_writer.rb', line 28

def opts
  @opts
end

#sockObject (readonly)

Returns the value of attribute sock.



28
29
30
# File 'lib/wavefront/batch_writer.rb', line 28

def sock
  @sock
end

#summaryObject (readonly)

Returns the value of attribute summary.



28
29
30
# File 'lib/wavefront/batch_writer.rb', line 28

def summary
  @summary
end

Instance Method Details

#close_socketObject



241
242
243
244
245
# File 'lib/wavefront/batch_writer.rb', line 241

def close_socket
  return if opts[:noop]
  puts 'Closing connection to proxy.' if opts[:verbose]
  sock.close
end

#hash_to_wf(p) ⇒ Object



172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
# File 'lib/wavefront/batch_writer.rb', line 172

def hash_to_wf(p)
  #
  # Convert the hash received by the write() method to a string
  # conforming with that defined in
  # https://community.wavefront.com/docs/DOC-1031
  #
  fail ArgumentError unless p.key?(:path) && p.key?(:value) &&
                            p.key?(:source)

  m = [p[:path], p[:value]]
  m.<< p[:ts].to_i.to_s if p.key?(:ts) && p[:ts]
  m.<< 'source=' + p[:source]
  m.<< tag_hash_to_str(p[:tags]) if p.key?(:tags) && p[:tags]
  m.<< tag_hash_to_str(opts[:tags]) if opts[:tags]
  m.join(' ')
end

#open_socketObject



222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
# File 'lib/wavefront/batch_writer.rb', line 222

def open_socket
  #
  # Open a socket to a Wavefront proxy, putting the descriptor
  # in instance variable @sock.
  #
  if opts[:noop]
    puts 'No-op requested. Not opening connection to proxy.'
    return true
  end

  puts "Connecting to #{opts[:proxy]}:#{opts[:port]}." if opts[:verbose]

  begin
    @sock = TCPSocket.new(opts[:proxy], opts[:port])
  rescue
    raise Wavefront::Exception::InvalidEndpoint
  end
end

#send_point(point) ⇒ Object



199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
# File 'lib/wavefront/batch_writer.rb', line 199

def send_point(point)
  #
  # Send a point, which should already be in Wavefront wire
  # format.
  #
  if opts[:noop]
    puts "Would send: #{point}"
    return
  end

  puts "Sending: #{point}" if opts[:verbose] || opts[:debug]

  begin
    sock.puts(point)
    summary[:sent] += 1
    return true
  rescue
    summary[:unsent] += 1
    puts 'WARNING: failed to send point.'
    return false
  end
end

#setup_options(user, defaults) ⇒ Object



75
76
77
78
79
80
# File 'lib/wavefront/batch_writer.rb', line 75

def setup_options(user, defaults)
  #
  # Fill in some defaults, if the user hasn't supplied them
  #
  defaults.merge(user)
end

#tag_hash_to_str(tags) ⇒ Object



189
190
191
192
193
194
195
196
197
# File 'lib/wavefront/batch_writer.rb', line 189

def tag_hash_to_str(tags)
  #
  # Convert a hash of tags into a string of key="val" tags. The
  # quoting is recommended in the WF wire-format guide. No tag
  # validation is done here: we assume you used valid_tags()
  #
  return '' unless tags.is_a?(Hash)
  tags.map { |k, v| "#{k}=\"#{v}\"" }.join(' ')
end

#valid_path?(path) ⇒ Boolean

Returns:

  • (Boolean)


137
138
139
140
141
142
# File 'lib/wavefront/batch_writer.rb', line 137

def valid_path?(path)
  fail Wavefront::Exception::InvalidMetricName unless \
    path.is_a?(String) && path.match(/^[a-z0-9\-_\.]+$/) &&
    path.length < 1024
  true
end

#valid_point?(point) ⇒ Boolean

Returns:

  • (Boolean)


123
124
125
126
127
128
129
130
131
132
133
134
135
# File 'lib/wavefront/batch_writer.rb', line 123

def valid_point?(point)
  #
  # Validate a point so it conforms to the standard described in
  # https://community.wavefront.com/docs/DOC-1031
  #
  return true if opts.key?(:novalidate) && opts[:novalidate]
  valid_path?(point[:path])
  valid_value?(point[:value])
  valid_ts?(point[:ts]) if point[:ts]
  valid_source?(point[:source])
  valid_tags?(point[:tags]) if point[:tags] && point[:tags].length > 0
  true
end

#valid_source?(path) ⇒ Boolean

Returns:

  • (Boolean)


156
157
158
159
160
161
162
# File 'lib/wavefront/batch_writer.rb', line 156

def valid_source?(path)
  unless path.is_a?(String) && path.match(/^[a-z0-9\-_\.]+$/) &&
         path.length < 1024
    fail Wavefront::Exception::InvalidSource
  end
  true
end

#valid_tags?(tags) ⇒ Boolean

Returns:

  • (Boolean)


164
165
166
167
168
169
170
# File 'lib/wavefront/batch_writer.rb', line 164

def valid_tags?(tags)
  tags.each do |k, v|
    fail Wavefront::Exception::InvalidTag unless (k.length +
         v.length < 254) && k.match(/^[a-z0-9\-_\.]+$/)
  end
  true
end

#valid_ts?(ts) ⇒ Boolean

Returns:

  • (Boolean)


149
150
151
152
153
154
# File 'lib/wavefront/batch_writer.rb', line 149

def valid_ts?(ts)
  unless ts.is_a?(Time) || ts.is_a?(Date)
    fail Wavefront::Exception::InvalidTimestamp
  end
  true
end

#valid_value?(value) ⇒ Boolean

Returns:

  • (Boolean)


144
145
146
147
# File 'lib/wavefront/batch_writer.rb', line 144

def valid_value?(value)
  fail Wavefront::Exception::InvalidMetricValue unless value.is_a?(Numeric)
  true
end

#write(points = [], options = {}) ⇒ Object



82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
# File 'lib/wavefront/batch_writer.rb', line 82

def write(points = [], options = {})
  #
  # Points are defined as hashes of the following form:
  # {
  #    path:   metrics path. String. Mandatory.
  #    value:  value of metric. Numeric. Mandatory.
  #    ts:     timestamp as a Time or Date object.  default:
  #            Time.now. May be omitted or false.
  #    source: originating source of metric. default: `hostname`
  #    tags:   optional hash of key: value point tags
  # }
  #
  # Send multiple points by using an array of the above hashes.
  #
  unless points.is_a?(Hash) || points.is_a?(Array)
    summary[:rejected] += 1
    return false
  end

  points = [points] if points.is_a?(Hash)

  points.each do |p|
    p[:ts] = Time.at(p[:ts]) if p[:ts].is_a?(Integer)
    begin
      valid_point?(p)
    rescue Wavefront::Exception::InvalidMetricName,
           Wavefront::Exception::InvalidMetricValue,
           Wavefront::Exception::InvalidTimestamp,
           Wavefront::Exception::InvalidSource,
           Wavefront::Exception::InvalidTag => e
      puts 'Invalid point, skipping.' if opts[:verbose]
      puts "Invalid point: #{p}. (#{e})" if opts[:debug]
      summary[:rejected] += 1
      next
    end

    send_point(hash_to_wf(p))
  end
  return summary[:rejected] == 0 ? true : false
end