Class: WavefrontCli::BaseWrite
- Includes:
- Wavefront::Mixins
- Defined in:
- lib/wavefront-cli/base_write.rb
Overview
Send points to a proxy.
Constant Summary
Constants included from Constants
Constants::ALL_PAGE_SIZE, Constants::DEFAULT_OPTS, Constants::HUMAN_TIME_FORMAT, Constants::HUMAN_TIME_FORMAT_MS
Instance Attribute Summary collapse
-
#fmt ⇒ Object
readonly
Returns the value of attribute fmt.
Attributes inherited from Base
#klass, #klass_word, #options, #wf
Instance Method Summary collapse
-
#call_write(data, openclose = true) ⇒ Object
A wrapper which lets us send normal points, deltas, or distributions.
- #do_file ⇒ Object
-
#do_point ⇒ Object
rubocop:disable Metrics/AbcSize.
-
#enough_fields?(line) ⇒ Boolean
Make sure we have the right number of columns, according to the format string.
-
#expand_dist(dist) ⇒ Object
We will let users write a distribution as ‘1 1 1’ or ‘3x1’ or even a mix of the two.
-
#extract_path(chunks) ⇒ Object
Find and return the metric path in a chunked line of input.
-
#extract_source(chunks) ⇒ Object
Find and return the source in a chunked line of input.
- #extract_tags(chunks) ⇒ Object
-
#extract_ts(chunks) ⇒ Float
Find and return the source in a chunked line of input.
-
#extract_value(chunks) ⇒ Object
Find and return the value in a chunked line of input.
-
#line_tags(chunks) ⇒ Object
We can get tags from the file, from the -T option, or both.
-
#process_input(file) ⇒ Object
Read the input, from a file or from STDIN, and turn each line into Wavefront points.
- #process_input_file(data) ⇒ Object
-
#process_line(line) ⇒ Object
Process a line of input, as described by the format string held in @fmt.
-
#read_stdin ⇒ Object
Read from standard in and stream points through an open socket.
-
#send_point(point) ⇒ Object
rubocop:enable Metrics/AbcSize.
-
#tags_to_hash(tags) ⇒ Object
Takes an array of key=value tags (as produced by docopt) and turns it into a hash of key: value tags.
-
#valid_format?(fmt) ⇒ Boolean
The format string must contain values.
-
#valid_timestamp?(timestamp) ⇒ Boolean
Although the SDK does value checking, we’ll add another layer of input checing here.
Methods inherited from Base
#_sdk_class, #check_status, #conds_to_query, #dispatch, #display, #display_api_error, #display_no_api_response, #do_delete, #do_describe, #do_import, #do_list, #do_search, #do_tag_add, #do_tag_clear, #do_tag_delete, #do_tag_set, #do_tags, #do_undelete, #do_update, #format_var, #handle_error, #handle_response, #hcl_fields, #import_to_create, #initialize, #load_display_class, #load_file, #load_from_stdin, #mk_creds, #mk_opts, #no_api_response, #ok_exit, #options_and_exit, #parseable_output, #range_hash, #run, #search_key, #validate_id, #validate_input, #validate_opts, #validate_tags, #validator_exception, #validator_method
Constructor Details
This class inherits a constructor from WavefrontCli::Base
Instance Attribute Details
#fmt ⇒ Object (readonly)
Returns the value of attribute fmt.
9 10 11 |
# File 'lib/wavefront-cli/base_write.rb', line 9 def fmt @fmt end |
Instance Method Details
#call_write(data, openclose = true) ⇒ Object
A wrapper which lets us send normal points, deltas, or distributions
63 64 65 66 67 68 69 |
# File 'lib/wavefront-cli/base_write.rb', line 63 def call_write(data, openclose = true) if [:delta] wf.write_delta(data, openclose) else wf.write(data, openclose) end end |
#do_file ⇒ Object
31 32 33 34 35 |
# File 'lib/wavefront-cli/base_write.rb', line 31 def do_file valid_format?([:infileformat]) setup_fmt([:infileformat] || 'tmv') process_input([:'<file>']) end |
#do_point ⇒ Object
rubocop:disable Metrics/AbcSize
13 14 15 16 17 18 19 20 21 |
# File 'lib/wavefront-cli/base_write.rb', line 13 def do_point p = { path: [:'<metric>'], value: [:'<value>'].delete('\\').to_f, tags: ([:tag]) } p[:source] = [:host] if [:host] p[:ts] = parse_time([:time]) if [:time] send_point(p) end |
#enough_fields?(line) ⇒ Boolean
Make sure we have the right number of columns, according to the format string. We want to take every precaution we can to stop users accidentally polluting their metric namespace with junk.
If the format string says we are expecting point tags, we may have more columns than the length of the format string.
267 268 269 270 271 272 273 |
# File 'lib/wavefront-cli/base_write.rb', line 267 def enough_fields?(line) ncols = line.split.length return true if fmt.include?('T') && ncols >= fmt.length return true if ncols == fmt.length raise(WavefrontCli::Exception::UnparseableInput, format('Expected %s fields, got %s', fmt.length, ncols)) end |
#expand_dist(dist) ⇒ Object
We will let users write a distribution as ‘1 1 1’ or ‘3x1’ or even a mix of the two
106 107 108 109 110 111 112 113 114 115 |
# File 'lib/wavefront-cli/base_write.rb', line 106 def (dist) dist.map do |v| if v.is_a?(String) && v.include?('x') x, val = v.split('x', 2) Array.new(x.to_i, val.to_f) else v.to_f end end.flatten end |
#extract_path(chunks) ⇒ Object
Find and return the metric path in a chunked line of input. The path can be in the data, or passed as an option, or both. If the latter, then we assume the option is a prefix, and concatenate the value in the data.
param chunks [Array] a chunked line of input from #process_line return [String] the metric path raise TypeError if field does not exist
143 144 145 146 147 148 149 |
# File 'lib/wavefront-cli/base_write.rb', line 143 def extract_path(chunks) m = chunks[fmt.index('m')] [:metric] ? [[:metric], m].join('.') : m rescue TypeError return [:metric] if [:metric] raise end |
#extract_source(chunks) ⇒ Object
Find and return the source in a chunked line of input.
param chunks [Array] a chunked line of input from #process_line return [String] the source, if it is there, or if not, the
value passed through by -H, or the local hostname.
157 158 159 160 161 |
# File 'lib/wavefront-cli/base_write.rb', line 157 def extract_source(chunks) chunks[fmt.index('s')] rescue TypeError [:source] || Socket.gethostname end |
#extract_tags(chunks) ⇒ Object
130 131 132 |
# File 'lib/wavefront-cli/base_write.rb', line 130 def (chunks) (chunks.last.split(/\s(?=(?:[^"]|"[^"]*")*$)/)) end |
#extract_ts(chunks) ⇒ Float
Find and return the source in a chunked line of input.
123 124 125 126 127 128 |
# File 'lib/wavefront-cli/base_write.rb', line 123 def extract_ts(chunks) ts = chunks[fmt.index('t')] return parse_time(ts) if (ts) rescue TypeError Time.now.utc.to_i end |
#extract_value(chunks) ⇒ Object
Find and return the value in a chunked line of input
param chunks [Array] a chunked line of input from #process_line return [Float] the value raise TypeError if field does not exist raise Wavefront::Exception::InvalidValue if it’s not a value
92 93 94 95 96 97 98 99 100 101 |
# File 'lib/wavefront-cli/base_write.rb', line 92 def extract_value(chunks) if fmt.include?('v') v = chunks[fmt.index('v')] v.to_f else raw = chunks[fmt.index('d')].split(',') xpanded = (raw) wf.mk_distribution(xpanded) end end |
#line_tags(chunks) ⇒ Object
We can get tags from the file, from the -T option, or both. Merge them, making the -T win if there is a collision.
203 204 205 206 207 |
# File 'lib/wavefront-cli/base_write.rb', line 203 def (chunks) = fmt.last == 'T' ? (chunks) : {} = ([:tag]) .merge() end |
#process_input(file) ⇒ Object
Read the input, from a file or from STDIN, and turn each line into Wavefront points.
40 41 42 43 44 45 46 47 |
# File 'lib/wavefront-cli/base_write.rb', line 40 def process_input(file) if file == '-' read_stdin else data = process_input_file(load_data(Pathname.new(file)).split("\n")) call_write(data) end end |
#process_input_file(data) ⇒ Object
49 50 51 52 53 54 55 56 57 58 |
# File 'lib/wavefront-cli/base_write.rb', line 49 def process_input_file(data) data.each_with_object([]) do |l, a| begin a.<< process_line(l) rescue WavefrontCli::Exception::UnparseableInput => e puts "Bad input. #{e.message}." next end end end |
#process_line(line) ⇒ Object
Process a line of input, as described by the format string held in @fmt. Produces a hash suitable for the SDK to send on.
We let the user define most of the fields, but anything beyond what they define is always assumed to be point tags. This is because you can have arbitrarily many of those for each point.
rubocop:disable Metrics/AbcSize rubocop:disable Metrics/MethodLength rubocop:disable Metrics/CyclomaticComplexity
176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 |
# File 'lib/wavefront-cli/base_write.rb', line 176 def process_line(line) return true if line.empty? chunks = line.split(/\s+/, fmt.length) enough_fields?(line) # can raise exception begin point = { path: extract_path(chunks), tags: (chunks), value: extract_value(chunks) } point[:ts] = extract_ts(chunks) if fmt.include?('t') point[:source] = extract_source(chunks) if fmt.include?('s') point[:interval] = [:interval] || 'm' if fmt.include?('d') rescue TypeError raise(WavefrontCli::Exception::UnparseableInput, "could not process #{line}") end point end |
#read_stdin ⇒ Object
Read from standard in and stream points through an open socket. If the user hits ctrl-c, close the socket and exit politely.
75 76 77 78 79 80 81 82 83 |
# File 'lib/wavefront-cli/base_write.rb', line 75 def read_stdin open_connection STDIN.each_line { |l| call_write(process_line(l.strip), false) } close_connection rescue SystemExit, Interrupt puts 'ctrl-c. Exiting.' wf.close exit 0 end |
#send_point(point) ⇒ Object
rubocop:enable Metrics/AbcSize
24 25 26 27 28 29 |
# File 'lib/wavefront-cli/base_write.rb', line 24 def send_point(point) call_write(point) rescue Wavefront::Exception::InvalidEndpoint abort format("Could not connect to proxy '%s:%s'.", [:proxy], [:port]) end |
#tags_to_hash(tags) ⇒ Object
Takes an array of key=value tags (as produced by docopt) and turns it into a hash of key: value tags. Anything not of the form key=val is dropped. If key or value are quoted, we remove the quotes.
return Hash
217 218 219 220 221 222 223 224 225 |
# File 'lib/wavefront-cli/base_write.rb', line 217 def () return nil unless [].flatten.each_with_object({}) do |t, ret| k, v = t.split('=', 2) k.gsub!(/^["']|["']$/, '') ret[k] = v.to_s.gsub(/^["']|["']$/, '') if v end end |
#valid_format?(fmt) ⇒ Boolean
The format string must contain values. They can be single values or distributions. So we must have ‘v’ xor ‘d’. It must not contain anything other than ‘m’, ‘t’, ‘T’, ‘s’, ‘d’, or ‘v’, and the ‘T’, if there, must be at the end. No letter must appear more than once.
rubocop:disable Metrics/PerceivedComplexity rubocop:disable Metrics/CyclomaticComplexity rubocop:disable Metrics/AbcSize
238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 |
# File 'lib/wavefront-cli/base_write.rb', line 238 def valid_format?(fmt) err = if fmt.include?('v') && fmt.include?('d') "'v' and 'd' are mutually exclusive" elsif !fmt.include?('v') && !fmt.include?('d') "format string must include 'v' or 'd'" elsif !fmt.match(/^[dmstTv]+$/) 'unsupported field in format string' elsif !fmt == fmt.split('').uniq.join 'repeated field in format string' elsif fmt.include?('T') && !fmt.end_with?('T') "if used, 'T' must come at end of format string" end return true if err.nil? raise(WavefrontCli::Exception::UnparseableInput, err) end |
#valid_timestamp?(timestamp) ⇒ Boolean
Although the SDK does value checking, we’ll add another layer of input checing here. See if the time looks valid. We’ll assume anything before 2000/01/01 or after a year from now is wrong. Arbitrary, but there has to be a cut-off somewhere.
280 281 282 283 284 |
# File 'lib/wavefront-cli/base_write.rb', line 280 def () (.is_a?(Integer) || .match(/^\d+$/)) && .to_i > 946_684_800 && .to_i < (Time.now.to_i + 31_557_600) end |