Class: WavefrontCli::BaseWrite

Inherits:
Base
  • Object
show all
Includes:
Wavefront::Mixins
Defined in:
lib/wavefront-cli/base_write.rb

Overview

Send points to a proxy.

Direct Known Subclasses

Report, Write

Constant Summary

Constants included from Constants

Constants::ALL_PAGE_SIZE, Constants::DEFAULT_OPTS, Constants::HUMAN_TIME_FORMAT, Constants::HUMAN_TIME_FORMAT_MS

Instance Attribute Summary collapse

Attributes inherited from Base

#klass, #klass_word, #options, #wf

Instance Method Summary collapse

Methods inherited from Base

#_sdk_class, #check_status, #conds_to_query, #dispatch, #display, #display_api_error, #display_no_api_response, #do_delete, #do_describe, #do_import, #do_list, #do_search, #do_tag_add, #do_tag_clear, #do_tag_delete, #do_tag_set, #do_tags, #do_undelete, #do_update, #format_var, #handle_error, #handle_response, #hcl_fields, #import_to_create, #initialize, #load_display_class, #load_file, #load_from_stdin, #mk_creds, #mk_opts, #no_api_response, #ok_exit, #options_and_exit, #parseable_output, #range_hash, #run, #search_key, #validate_id, #validate_input, #validate_opts, #validate_tags, #validator_exception, #validator_method

Constructor Details

This class inherits a constructor from WavefrontCli::Base

Instance Attribute Details

#fmtObject (readonly)

Returns the value of attribute fmt.



9
10
11
# File 'lib/wavefront-cli/base_write.rb', line 9

def fmt
  @fmt
end

Instance Method Details

#call_write(data, openclose = true) ⇒ Object

A wrapper which lets us send normal points, deltas, or distributions



63
64
65
66
67
68
69
# File 'lib/wavefront-cli/base_write.rb', line 63

def call_write(data, openclose = true)
  if options[:delta]
    wf.write_delta(data, openclose)
  else
    wf.write(data, openclose)
  end
end

#do_fileObject



31
32
33
34
35
# File 'lib/wavefront-cli/base_write.rb', line 31

def do_file
  valid_format?(options[:infileformat])
  setup_fmt(options[:infileformat] || 'tmv')
  process_input(options[:'<file>'])
end

#do_pointObject

rubocop:disable Metrics/AbcSize



13
14
15
16
17
18
19
20
21
# File 'lib/wavefront-cli/base_write.rb', line 13

def do_point
  p = { path:  options[:'<metric>'],
        value: options[:'<value>'].delete('\\').to_f,
        tags:  tags_to_hash(options[:tag]) }

  p[:source] = options[:host] if options[:host]
  p[:ts] = parse_time(options[:time]) if options[:time]
  send_point(p)
end

#enough_fields?(line) ⇒ Boolean

Make sure we have the right number of columns, according to the format string. We want to take every precaution we can to stop users accidentally polluting their metric namespace with junk.

If the format string says we are expecting point tags, we may have more columns than the length of the format string.

Returns:

  • (Boolean)

Raises:



267
268
269
270
271
272
273
# File 'lib/wavefront-cli/base_write.rb', line 267

def enough_fields?(line)
  ncols = line.split.length
  return true if fmt.include?('T') && ncols >= fmt.length
  return true if ncols == fmt.length
  raise(WavefrontCli::Exception::UnparseableInput,
        format('Expected %s fields, got %s', fmt.length, ncols))
end

#expand_dist(dist) ⇒ Object

We will let users write a distribution as ‘1 1 1’ or ‘3x1’ or even a mix of the two



106
107
108
109
110
111
112
113
114
115
# File 'lib/wavefront-cli/base_write.rb', line 106

def expand_dist(dist)
  dist.map do |v|
    if v.is_a?(String) && v.include?('x')
      x, val = v.split('x', 2)
      Array.new(x.to_i, val.to_f)
    else
      v.to_f
    end
  end.flatten
end

#extract_path(chunks) ⇒ Object

Find and return the metric path in a chunked line of input. The path can be in the data, or passed as an option, or both. If the latter, then we assume the option is a prefix, and concatenate the value in the data.

param chunks [Array] a chunked line of input from #process_line return [String] the metric path raise TypeError if field does not exist



143
144
145
146
147
148
149
# File 'lib/wavefront-cli/base_write.rb', line 143

def extract_path(chunks)
  m = chunks[fmt.index('m')]
  options[:metric] ? [options[:metric], m].join('.') : m
rescue TypeError
  return options[:metric] if options[:metric]
  raise
end

#extract_source(chunks) ⇒ Object

Find and return the source in a chunked line of input.

param chunks [Array] a chunked line of input from #process_line return [String] the source, if it is there, or if not, the

value passed through by -H, or the local hostname.


157
158
159
160
161
# File 'lib/wavefront-cli/base_write.rb', line 157

def extract_source(chunks)
  chunks[fmt.index('s')]
rescue TypeError
  options[:source] || Socket.gethostname
end

#extract_tags(chunks) ⇒ Object



130
131
132
# File 'lib/wavefront-cli/base_write.rb', line 130

def extract_tags(chunks)
  tags_to_hash(chunks.last.split(/\s(?=(?:[^"]|"[^"]*")*$)/))
end

#extract_ts(chunks) ⇒ Float

Find and return the source in a chunked line of input.

Parameters:

  • chunks (Array)

    a chunked line of input from #process_line

Returns:

  • (Float)

    the timestamp, if it is there, or the current UTC time if it is not.



123
124
125
126
127
128
# File 'lib/wavefront-cli/base_write.rb', line 123

def extract_ts(chunks)
  ts = chunks[fmt.index('t')]
  return parse_time(ts) if valid_timestamp?(ts)
rescue TypeError
  Time.now.utc.to_i
end

#extract_value(chunks) ⇒ Object

Find and return the value in a chunked line of input

param chunks [Array] a chunked line of input from #process_line return [Float] the value raise TypeError if field does not exist raise Wavefront::Exception::InvalidValue if it’s not a value



92
93
94
95
96
97
98
99
100
101
# File 'lib/wavefront-cli/base_write.rb', line 92

def extract_value(chunks)
  if fmt.include?('v')
    v = chunks[fmt.index('v')]
    v.to_f
  else
    raw = chunks[fmt.index('d')].split(',')
    xpanded = expand_dist(raw)
    wf.mk_distribution(xpanded)
  end
end

#line_tags(chunks) ⇒ Object

We can get tags from the file, from the -T option, or both. Merge them, making the -T win if there is a collision.



203
204
205
206
207
# File 'lib/wavefront-cli/base_write.rb', line 203

def line_tags(chunks)
  file_tags = fmt.last == 'T' ? extract_tags(chunks) : {}
  opt_tags = tags_to_hash(options[:tag])
  file_tags.merge(opt_tags)
end

#process_input(file) ⇒ Object

Read the input, from a file or from STDIN, and turn each line into Wavefront points.



40
41
42
43
44
45
46
47
# File 'lib/wavefront-cli/base_write.rb', line 40

def process_input(file)
  if file == '-'
    read_stdin
  else
    data = process_input_file(load_data(Pathname.new(file)).split("\n"))
    call_write(data)
  end
end

#process_input_file(data) ⇒ Object



49
50
51
52
53
54
55
56
57
58
# File 'lib/wavefront-cli/base_write.rb', line 49

def process_input_file(data)
  data.each_with_object([]) do |l, a|
    begin
      a.<< process_line(l)
    rescue WavefrontCli::Exception::UnparseableInput => e
      puts "Bad input. #{e.message}."
      next
    end
  end
end

#process_line(line) ⇒ Object

Process a line of input, as described by the format string held in @fmt. Produces a hash suitable for the SDK to send on.

We let the user define most of the fields, but anything beyond what they define is always assumed to be point tags. This is because you can have arbitrarily many of those for each point.

rubocop:disable Metrics/AbcSize rubocop:disable Metrics/MethodLength rubocop:disable Metrics/CyclomaticComplexity

Raises:

  • WavefrontCli::Exception::UnparseableInput if the line doesn’t look right



176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
# File 'lib/wavefront-cli/base_write.rb', line 176

def process_line(line)
  return true if line.empty?
  chunks = line.split(/\s+/, fmt.length)
  enough_fields?(line) # can raise exception

  begin
    point = { path:  extract_path(chunks),
              tags:  line_tags(chunks),
              value: extract_value(chunks) }

    point[:ts]       = extract_ts(chunks)        if fmt.include?('t')
    point[:source]   = extract_source(chunks)    if fmt.include?('s')
    point[:interval] = options[:interval] || 'm' if fmt.include?('d')
  rescue TypeError
    raise(WavefrontCli::Exception::UnparseableInput,
          "could not process #{line}")
  end

  point
end

#read_stdinObject

Read from standard in and stream points through an open socket. If the user hits ctrl-c, close the socket and exit politely.



75
76
77
78
79
80
81
82
83
# File 'lib/wavefront-cli/base_write.rb', line 75

def read_stdin
  open_connection
  STDIN.each_line { |l| call_write(process_line(l.strip), false) }
  close_connection
rescue SystemExit, Interrupt
  puts 'ctrl-c. Exiting.'
  wf.close
  exit 0
end

#send_point(point) ⇒ Object

rubocop:enable Metrics/AbcSize



24
25
26
27
28
29
# File 'lib/wavefront-cli/base_write.rb', line 24

def send_point(point)
  call_write(point)
rescue Wavefront::Exception::InvalidEndpoint
  abort format("Could not connect to proxy '%s:%s'.",
               options[:proxy], options[:port])
end

#tags_to_hash(tags) ⇒ Object

Takes an array of key=value tags (as produced by docopt) and turns it into a hash of key: value tags. Anything not of the form key=val is dropped. If key or value are quoted, we remove the quotes.

return Hash

Parameters:



217
218
219
220
221
222
223
224
225
# File 'lib/wavefront-cli/base_write.rb', line 217

def tags_to_hash(tags)
  return nil unless tags

  [tags].flatten.each_with_object({}) do |t, ret|
    k, v = t.split('=', 2)
    k.gsub!(/^["']|["']$/, '')
    ret[k] = v.to_s.gsub(/^["']|["']$/, '') if v
  end
end

#valid_format?(fmt) ⇒ Boolean

The format string must contain values. They can be single values or distributions. So we must have ‘v’ xor ‘d’. It must not contain anything other than ‘m’, ‘t’, ‘T’, ‘s’, ‘d’, or ‘v’, and the ‘T’, if there, must be at the end. No letter must appear more than once.

rubocop:disable Metrics/PerceivedComplexity rubocop:disable Metrics/CyclomaticComplexity rubocop:disable Metrics/AbcSize

Parameters:

  • fmt (String)

    format of input file

Returns:

  • (Boolean)

Raises:



238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
# File 'lib/wavefront-cli/base_write.rb', line 238

def valid_format?(fmt)
  err = if fmt.include?('v') && fmt.include?('d')
          "'v' and 'd' are mutually exclusive"
        elsif !fmt.include?('v') && !fmt.include?('d')
          "format string must include 'v' or 'd'"
        elsif !fmt.match(/^[dmstTv]+$/)
          'unsupported field in format string'
        elsif !fmt == fmt.split('').uniq.join
          'repeated field in format string'
        elsif fmt.include?('T') && !fmt.end_with?('T')
          "if used, 'T' must come at end of format string"
        end

  return true if err.nil?

  raise(WavefrontCli::Exception::UnparseableInput, err)
end

#valid_timestamp?(timestamp) ⇒ Boolean

Although the SDK does value checking, we’ll add another layer of input checing here. See if the time looks valid. We’ll assume anything before 2000/01/01 or after a year from now is wrong. Arbitrary, but there has to be a cut-off somewhere.

Returns:

  • (Boolean)


280
281
282
283
284
# File 'lib/wavefront-cli/base_write.rb', line 280

def valid_timestamp?(timestamp)
  (timestamp.is_a?(Integer) || timestamp.match(/^\d+$/)) &&
    timestamp.to_i > 946_684_800 &&
    timestamp.to_i < (Time.now.to_i + 31_557_600)
end