Class: Wavefront::BatchWriter
- Inherits:
-
Object
- Object
- Wavefront::BatchWriter
show all
- Includes:
- Constants
- Defined in:
- lib/wavefront/batch_writer.rb
Overview
This class exists to facilitate sending of multiple data points to a Wavefront proxy. It sends points in native Wavefront format.
When initializing the instance you can define point tags which will apply to all points sent via that instance.
Though we provide methods to do it, it is the developer’s responsibility to open and close the socket to the proxy. Points are sent by calling the write() method.
The class keeps a count of the points the current instance has sent, dropped, and failed to send, in @summary. The socket is accessed through the instance variable @sock.
Constant Summary
Constants included
from Constants
Constants::ALERT_FORMATS, Constants::DEFAULT_ALERT_FORMAT, Constants::DEFAULT_FORMAT, Constants::DEFAULT_HOST, Constants::DEFAULT_INFILE_FORMAT, Constants::DEFAULT_OBSOLETE_METRICS, Constants::DEFAULT_PERIOD_SECONDS, Constants::DEFAULT_PREFIX_LENGTH, Constants::DEFAULT_PROXY, Constants::DEFAULT_PROXY_PORT, Constants::DEFAULT_STRICT, Constants::EVENT_LEVELS, Constants::EVENT_STATE_DIR, Constants::FORMATS, Constants::GRANULARITIES
Instance Attribute Summary collapse
Instance Method Summary
collapse
Constructor Details
#initialize(options = {}) ⇒ BatchWriter
Returns a new instance of BatchWriter.
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
|
# File 'lib/wavefront/batch_writer.rb', line 31
def initialize(options = {})
defaults = {
tags: false,
proxy: DEFAULT_PROXY,
port: DEFAULT_PROXY_PORT,
noop: false,
novalidate: false,
verbose: false,
debug: false,
}
@summary = { sent: 0,
rejected: 0,
unsent: 0,
}
@opts = setup_options(options, defaults)
if opts[:tags]
valid_tags?(opts[:tags])
@global_tags = opts[:tags]
end
end
|
Instance Attribute Details
#opts ⇒ Object
Returns the value of attribute opts.
28
29
30
|
# File 'lib/wavefront/batch_writer.rb', line 28
def opts
@opts
end
|
#sock ⇒ Object
Returns the value of attribute sock.
28
29
30
|
# File 'lib/wavefront/batch_writer.rb', line 28
def sock
@sock
end
|
#summary ⇒ Object
Returns the value of attribute summary.
28
29
30
|
# File 'lib/wavefront/batch_writer.rb', line 28
def summary
@summary
end
|
Instance Method Details
#close_socket ⇒ Object
241
242
243
244
245
|
# File 'lib/wavefront/batch_writer.rb', line 241
def close_socket
return if opts[:noop]
puts 'Closing connection to proxy.' if opts[:verbose]
sock.close
end
|
#hash_to_wf(p) ⇒ Object
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
|
# File 'lib/wavefront/batch_writer.rb', line 172
def hash_to_wf(p)
fail ArgumentError unless p.key?(:path) && p.key?(:value) &&
p.key?(:source)
m = [p[:path], p[:value]]
m.<< p[:ts].to_i.to_s if p.key?(:ts) && p[:ts]
m.<< 'source=' + p[:source]
m.<< tag_hash_to_str(p[:tags]) if p.key?(:tags) && p[:tags]
m.<< tag_hash_to_str(opts[:tags]) if opts[:tags]
m.join(' ')
end
|
#open_socket ⇒ Object
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
|
# File 'lib/wavefront/batch_writer.rb', line 222
def open_socket
if opts[:noop]
puts 'No-op requested. Not opening connection to proxy.'
return true
end
puts "Connecting to #{opts[:proxy]}:#{opts[:port]}." if opts[:verbose]
begin
@sock = TCPSocket.new(opts[:proxy], opts[:port])
rescue
raise Wavefront::Exception::InvalidEndpoint
end
end
|
#send_point(point) ⇒ Object
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
|
# File 'lib/wavefront/batch_writer.rb', line 199
def send_point(point)
if opts[:noop]
puts "Would send: #{point}"
return
end
puts "Sending: #{point}" if opts[:verbose] || opts[:debug]
begin
sock.puts(point)
summary[:sent] += 1
return true
rescue
summary[:unsent] += 1
puts 'WARNING: failed to send point.'
return false
end
end
|
#setup_options(user, defaults) ⇒ Object
75
76
77
78
79
80
|
# File 'lib/wavefront/batch_writer.rb', line 75
def setup_options(user, defaults)
defaults.merge(user)
end
|
#tag_hash_to_str(tags) ⇒ Object
189
190
191
192
193
194
195
196
197
|
# File 'lib/wavefront/batch_writer.rb', line 189
def tag_hash_to_str(tags)
return '' unless tags.is_a?(Hash)
tags.map { |k, v| "#{k}=\"#{v}\"" }.join(' ')
end
|
#valid_path?(path) ⇒ Boolean
137
138
139
140
141
142
|
# File 'lib/wavefront/batch_writer.rb', line 137
def valid_path?(path)
fail Wavefront::Exception::InvalidMetricName unless \
path.is_a?(String) && path.match(/^[a-z0-9\-_\.]+$/) &&
path.length < 1024
true
end
|
#valid_point?(point) ⇒ Boolean
123
124
125
126
127
128
129
130
131
132
133
134
135
|
# File 'lib/wavefront/batch_writer.rb', line 123
def valid_point?(point)
return true if opts.key?(:novalidate) && opts[:novalidate]
valid_path?(point[:path])
valid_value?(point[:value])
valid_ts?(point[:ts]) if point[:ts]
valid_source?(point[:source])
valid_tags?(point[:tags]) if point[:tags] && point[:tags].length > 0
true
end
|
#valid_source?(path) ⇒ Boolean
156
157
158
159
160
161
162
|
# File 'lib/wavefront/batch_writer.rb', line 156
def valid_source?(path)
unless path.is_a?(String) && path.match(/^[a-z0-9\-_\.]+$/) &&
path.length < 1024
fail Wavefront::Exception::InvalidSource
end
true
end
|
164
165
166
167
168
169
170
|
# File 'lib/wavefront/batch_writer.rb', line 164
def valid_tags?(tags)
tags.each do |k, v|
fail Wavefront::Exception::InvalidTag unless (k.length +
v.length < 254) && k.match(/^[a-z0-9\-_\.]+$/)
end
true
end
|
#valid_ts?(ts) ⇒ Boolean
149
150
151
152
153
154
|
# File 'lib/wavefront/batch_writer.rb', line 149
def valid_ts?(ts)
unless ts.is_a?(Time) || ts.is_a?(Date)
fail Wavefront::Exception::InvalidTimestamp
end
true
end
|
#valid_value?(value) ⇒ Boolean
144
145
146
147
|
# File 'lib/wavefront/batch_writer.rb', line 144
def valid_value?(value)
fail Wavefront::Exception::InvalidMetricValue unless value.is_a?(Numeric)
true
end
|
#write(points = [], options = {}) ⇒ Object
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
|
# File 'lib/wavefront/batch_writer.rb', line 82
def write(points = [], options = {})
unless points.is_a?(Hash) || points.is_a?(Array)
summary[:rejected] += 1
return false
end
points = [points] if points.is_a?(Hash)
points.each do |p|
p[:ts] = Time.at(p[:ts]) if p[:ts].is_a?(Integer)
begin
valid_point?(p)
rescue Wavefront::Exception::InvalidMetricName,
Wavefront::Exception::InvalidMetricValue,
Wavefront::Exception::InvalidTimestamp,
Wavefront::Exception::InvalidSource,
Wavefront::Exception::InvalidTag => e
puts 'Invalid point, skipping.' if opts[:verbose]
puts "Invalid point: #{p}. (#{e})" if opts[:debug]
summary[:rejected] += 1
next
end
send_point(hash_to_wf(p))
end
return summary[:rejected] == 0 ? true : false
end
|