Class: Wavefront::Cli::BatchWrite
Overview
Constant Summary
Wavefront::Constants::ALERT_FORMATS, Wavefront::Constants::DASH_FORMATS, Wavefront::Constants::DEFAULT_ALERT_FORMAT, Wavefront::Constants::DEFAULT_DASH_FORMAT, Wavefront::Constants::DEFAULT_FORMAT, Wavefront::Constants::DEFAULT_HOST, Wavefront::Constants::DEFAULT_INFILE_FORMAT, Wavefront::Constants::DEFAULT_OBSOLETE_METRICS, Wavefront::Constants::DEFAULT_OPTS, Wavefront::Constants::DEFAULT_PERIOD_SECONDS, Wavefront::Constants::DEFAULT_PREFIX_LENGTH, Wavefront::Constants::DEFAULT_PROXY, Wavefront::Constants::DEFAULT_PROXY_PORT, Wavefront::Constants::DEFAULT_SOURCE_FORMAT, Wavefront::Constants::DEFAULT_STRICT, Wavefront::Constants::EVENT_LEVELS, Wavefront::Constants::EVENT_STATE_DIR, Wavefront::Constants::FORMATS, Wavefront::Constants::GRANULARITIES, Wavefront::Constants::SOURCE_FORMATS
Instance Attribute Summary collapse
#arguments, #noop, #options
Instance Method Summary
collapse
Methods included from Mixins
#call_delete, #call_get, #call_post, #hash_to_qs, #interpolate_schema, #load_file, #parse_time, #time_to_ms, #uri_concat
#initialize
Constructor Details
This class inherits a constructor from Wavefront::Cli
Instance Attribute Details
#fmt ⇒ Object
Returns the value of attribute fmt.
10
11
12
|
# File 'lib/wavefront/cli/batch_write.rb', line 10
def fmt
@fmt
end
|
#opts ⇒ Object
Returns the value of attribute opts.
10
11
12
|
# File 'lib/wavefront/cli/batch_write.rb', line 10
def opts
@opts
end
|
#sock ⇒ Object
Returns the value of attribute sock.
10
11
12
|
# File 'lib/wavefront/cli/batch_write.rb', line 10
def sock
@sock
end
|
#wf ⇒ Object
Returns the value of attribute wf.
10
11
12
|
# File 'lib/wavefront/cli/batch_write.rb', line 10
def wf
@wf
end
|
Instance Method Details
#load_data(file) ⇒ Object
91
92
93
94
95
96
97
|
# File 'lib/wavefront/cli/batch_write.rb', line 91
def load_data(file)
begin
IO.read(file)
rescue
raise "Cannot open file '#{file}'." unless file.exist?
end
end
|
#process_filedata(data) ⇒ Object
99
100
101
102
103
104
105
|
# File 'lib/wavefront/cli/batch_write.rb', line 99
def process_filedata(data)
data.split("\n").each { |l| wf.write(process_line(l)) }
end
|
#process_line(l) ⇒ Object
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
|
# File 'lib/wavefront/cli/batch_write.rb', line 153
def process_line(l)
return true if l.empty?
m_prefix = opts[:prefix]
chunks = l.split(/\s+/, fmt.length)
begin
raise 'wrong number of fields' unless valid_line?(l)
begin
v = chunks[fmt.index('v')]
if valid_value?(v)
point = { value: v.to_f }
else
raise "invalid value '#{v}'"
end
rescue TypeError
raise "no value in '#{l}'"
end
point[:ts] = begin
ts = chunks[fmt.index('t')]
if valid_timestamp?(ts)
Time.at(parse_time(ts))
else
raise "invalid timestamp '#{ts}'"
end
rescue TypeError
Time.now.utc.to_i
end
point[:source] = begin
chunks[fmt.index('s')]
rescue TypeError
opts[:source]
end
begin
m = chunks[fmt.index('m')]
point[:path] = m_prefix.empty? ? m : [m_prefix, m].join('.')
rescue TypeError
if m_prefix
point[:path] = m_prefix
else
raise "metric path in '#{l}'"
end
end
rescue => e
puts "WARNING: #{e}. Skipping."
return false
end
if fmt.last == 'T'
point[:tags] =
tags_to_hash(chunks.last.split(/\s(?=(?:[^"]|"[^"]*")*$)/))
end
point
end
|
#run ⇒ Object
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
|
# File 'lib/wavefront/cli/batch_write.rb', line 22
def run
unless valid_format?(options[:infileformat])
raise 'Invalid format string.'
end
file = options[:'<file>']
setup_opts(options)
if options.key?(:infileformat)
setup_fmt(options[:infileformat])
else
setup_fmt
end
@wf = Wavefront::BatchWriter.new(options)
begin
wf.open_socket
rescue
raise 'unable to connect to proxy'
end
begin
if file == '-'
STDIN.each_line { |l| wf.write(process_line(l.strip)) }
else
process_filedata(load_data(Pathname.new(file)))
end
ensure
wf.close_socket
end
puts "Point summary: " + (%w(sent unsent rejected).map do |p|
[wf.summary[p.to_sym], p].join(' ')
end.join(', ')) + '.'
end
|
#setup_fmt(fmt = DEFAULT_INFILE_FORMAT) ⇒ Object
59
60
61
|
# File 'lib/wavefront/cli/batch_write.rb', line 59
def setup_fmt(fmt = DEFAULT_INFILE_FORMAT)
@fmt = fmt.split('')
end
|
#setup_opts(options) ⇒ Object
63
64
65
66
67
68
69
70
71
72
73
|
# File 'lib/wavefront/cli/batch_write.rb', line 63
def setup_opts(options)
@opts = {
prefix: options[:metric] || '',
source: options[:host] || Socket.gethostname,
tags: tags_to_hash(options[:tag]),
endpoint: options[:proxy],
port: options[:port],
verbose: options[:verbose],
noop: options[:noop],
}
end
|
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
|
# File 'lib/wavefront/cli/batch_write.rb', line 75
def tags_to_hash(tags)
tags = [tags] if tags.is_a?(String)
tags = {} unless tags.is_a?(Array)
tags.each_with_object({}) do |t, m|
k, v = t.split('=', 2)
m[k.gsub(/^["']|["']$/, '').to_sym] =
v.to_s.gsub(/^["']|["']$/, '') if v
end
end
|
107
108
109
110
111
112
113
114
115
|
# File 'lib/wavefront/cli/batch_write.rb', line 107
def valid_format?(fmt)
fmt.include?('v') && fmt.match(/^[mtv]+T?$/) && fmt ==
fmt.split('').uniq.join
end
|
#valid_line?(l) ⇒ Boolean
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
|
# File 'lib/wavefront/cli/batch_write.rb', line 117
def valid_line?(l)
ncols = l.split.length
if fmt.include?('T')
return false unless ncols >= fmt.length
else
return false unless ncols == fmt.length
end
true
end
|
#valid_timestamp?(ts) ⇒ Boolean
138
139
140
141
142
143
144
145
146
147
|
# File 'lib/wavefront/cli/batch_write.rb', line 138
def valid_timestamp?(ts)
(ts.is_a?(Integer) || ts.match(/^\d+$/)) &&
ts.to_i > 946684800 && ts.to_i < (Time.now.to_i + 31557600)
end
|
#valid_value?(val) ⇒ Boolean
149
150
151
|
# File 'lib/wavefront/cli/batch_write.rb', line 149
def valid_value?(val)
val.is_a?(Numeric) || (val.match(/^-?[\d\.e]+$/) && val.count('.') < 2)
end
|
#validate_opts ⇒ Object
15
16
17
18
19
20
|
# File 'lib/wavefront/cli/batch_write.rb', line 15
def validate_opts
abort 'Please supply a proxy endpoint.' unless options[:proxy]
end
|