Class: WavefrontCli::Write

Inherits:
Base
  • Object
show all
Includes:
Wavefront::Mixins
Defined in:
lib/wavefront-cli/write.rb

Overview

Send points via a proxy. This inherits from the same base class as Report, but has to do a couple of things differently, as it speaks to a proxy rather than to the API.

Constant Summary collapse

SPLIT_PATTERN =
/\s(?=(?:[^"]|"[^"]*")*$)/.freeze

Constants included from Constants

Constants::ALL_PAGE_SIZE, Constants::DEFAULT_CONFIG, Constants::DEFAULT_OPTS, Constants::EVENT_STATE_DIR, Constants::HUMAN_TIME_FORMAT, Constants::HUMAN_TIME_FORMAT_MS, Constants::SEARCH_SPLIT

Instance Attribute Summary collapse

Attributes inherited from Base

#klass, #klass_word, #options, #wf

Instance Method Summary collapse

Methods inherited from Base

#cannot_noop!, #check_response_blocks, #check_status, #cli_output_class, #conds_to_query, #dispatch, #display, #display_api_error, #display_no_api_response, #do_delete, #do_describe, #do_dump, #do_import, #do_list, #do_search, #do_set, #do_undelete, #dump_json, #dump_yaml, #extract_values, #failed_validation_message, #format_var, #handle_error, #handle_response, #hcl_fields, #import_message, #import_object, #import_to_create, #import_update, #initialize, #item_dump_call, #load_display_class, #load_file, #load_from_stdin, #load_json, #load_yaml, #matching_method, #method_word_list, #mk_opts, #name_of_do_method, #no_api_response, #ok_exit, #one_or_all, #options_and_exit, #parseable_output, #range_hash, #run, #search_key, #smart_delete, #smart_delete_message, #status_error_handler, #unsupported_format_message, #validate_id, #validate_input, #validate_tags, #validator_exception, #validator_method, #warning_message

Constructor Details

This class inherits a constructor from WavefrontCli::Base

Instance Attribute Details

#fmtObject (readonly)

Returns the value of attribute fmt.



13
14
15
# File 'lib/wavefront-cli/write.rb', line 13

def fmt
  @fmt
end

Instance Method Details

#_sdk_classObject

I chose to prioritise UI consistency over internal elegance here. The write command doesn’t follow the age-old assumption that each command maps 1:1 to a similarly named SDK class. Write can use write or distribution.



70
71
72
73
74
# File 'lib/wavefront-cli/write.rb', line 70

def _sdk_class
  return 'Wavefront::Distribution' if distribution?

  'Wavefront::Write'
end

#call_write(data, openclose = true) ⇒ Object

A wrapper which lets us send normal points, deltas, or distributions



158
159
160
161
162
163
164
# File 'lib/wavefront-cli/write.rb', line 158

def call_write(data, openclose = true)
  if options[:delta]
    wf.write_delta(data, openclose)
  else
    wf.write(data, openclose)
  end
end

#close_connectionObject



119
120
121
# File 'lib/wavefront-cli/write.rb', line 119

def close_connection
  wf.close
end

#default_portObject



90
91
92
# File 'lib/wavefront-cli/write.rb', line 90

def default_port
  distribution? ? 40_000 : 2878
end

#distribution?Boolean

Returns:

  • (Boolean)


76
77
78
79
80
# File 'lib/wavefront-cli/write.rb', line 76

def distribution?
  return true if options[:distribution]

  options[:infileformat]&.include?('d')
end

#do_distributionObject



37
38
39
# File 'lib/wavefront-cli/write.rb', line 37

def do_distribution
  send_point(make_distribution_point(tags_to_hash(options[:tag])))
end

#do_fileObject

rubocop:enable Metrics/AbcSize



31
32
33
34
35
# File 'lib/wavefront-cli/write.rb', line 31

def do_file
  valid_format?(options[:infileformat])
  setup_fmt(options[:infileformat] || 'tmv')
  process_input(options[:'<file>'])
end

#do_pointObject

rubocop:disable Metrics/AbcSize



18
19
20
21
22
23
24
25
26
27
28
# File 'lib/wavefront-cli/write.rb', line 18

def do_point
  tags = tags_to_hash(options[:tag])

  p = { path: options[:'<metric>'],
        value: options[:'<value>'].delete('\\').to_f }

  p[:tags] = tags unless tags.empty?
  p[:source] = options[:host] if options[:host]
  p[:ts] = parse_time(options[:time]) if options[:time]
  send_point(p)
end

#enough_fields?(line) ⇒ True

Make sure we have the right number of columns, according to the format string. We want to take every precaution we can to stop users accidentally polluting their metric namespace with junk.

Parameters:

  • line (String)

    input line

Returns:

  • (True)

    if the number of fields is correct

Raises:

  • WavefrontCli::Exception::UnparseableInput if there are not the right number of fields.



389
390
391
392
393
394
395
396
397
398
# File 'lib/wavefront-cli/write.rb', line 389

def enough_fields?(line)
  ncols = line.split(SPLIT_PATTERN).length
  return true if fmt.include?('T') && ncols >= fmt.length
  return true if ncols == fmt.length

  raise(WavefrontCli::Exception::UnparseableInput,
        format('Expected %<expected>s fields, got %<got>s',
               expected: fmt.length,
               got: ncols))
end

#expand_dist(dist) ⇒ Object

We will let users write a distribution as ‘1 1 1’ or ‘3x1’ or even a mix of the two



201
202
203
204
205
206
207
208
209
210
# File 'lib/wavefront-cli/write.rb', line 201

def expand_dist(dist)
  dist.map do |v|
    if v.is_a?(String) && v.include?('x')
      x, val = v.split('x', 2)
      Array.new(x.to_i, val.to_f)
    else
      v.to_f
    end
  end.flatten
end

#extra_optionsObject



61
62
63
# File 'lib/wavefront-cli/write.rb', line 61

def extra_options
  options[:using] ? { writer: options[:using] } : {}
end

#extract_path(chunks) ⇒ Object

Find and return the metric path in a chunked line of input. The path can be in the data, or passed as an option, or both. If the latter, then we assume the option is a prefix, and concatenate the value in the data.

param chunks [Array] a chunked line of input from #process_line return [String] the metric path raise TypeError if field does not exist



242
243
244
245
246
247
248
249
# File 'lib/wavefront-cli/write.rb', line 242

def extract_path(chunks)
  m = chunks[fmt.index('m')]
  options[:metric] ? [options[:metric], m].join('.') : m
rescue TypeError
  return options[:metric] if options[:metric]

  raise
end

#extract_source(chunks) ⇒ Object

Find and return the source in a chunked line of input.

param chunks [Array] a chunked line of input from #process_line return [String] the source, if it is there, or if not, the

value passed through by -H, or the local hostname.


257
258
259
260
261
# File 'lib/wavefront-cli/write.rb', line 257

def extract_source(chunks)
  chunks[fmt.index('s')]
rescue TypeError
  options[:source] || Socket.gethostname
end

#extract_tags(chunks) ⇒ Hash

Returns of k = v tags.

Parameters:

  • chunks (Array)

    an input line broken into tokens. The final token will be a space-separated list of point tags.

Returns:

  • (Hash)

    of k = v tags.



229
230
231
# File 'lib/wavefront-cli/write.rb', line 229

def extract_tags(chunks)
  tags_to_hash(chunks.last.split(SPLIT_PATTERN))
end

#extract_ts(chunks) ⇒ Float

Find and return the source in a chunked line of input.

Parameters:

  • chunks (Array)

    a chunked line of input from #process_line

Returns:

  • (Float)

    the timestamp, if it is there, or the current UTC time if it is not.



218
219
220
221
222
223
# File 'lib/wavefront-cli/write.rb', line 218

def extract_ts(chunks)
  ts = chunks[fmt.index('t')]
  return parse_time(ts) if valid_timestamp?(ts)
rescue TypeError
  Time.now.utc.to_i
end

#extract_value(chunks) ⇒ Object

Find and return the value in a chunked line of input

param chunks [Array] a chunked line of input from #process_line return [Float] the value raise TypeError if field does not exist raise Wavefront::Exception::InvalidValue if it’s not a value



187
188
189
190
191
192
193
194
195
196
# File 'lib/wavefront-cli/write.rb', line 187

def extract_value(chunks)
  if fmt.include?('v')
    v = chunks[fmt.index('v')]
    v.to_f
  else
    raw = chunks[fmt.index('d')].split(',')
    xpanded = expand_dist(raw)
    wf.mk_distribution(xpanded)
  end
end

#format_string_does_not_have_v_and_d?(fmt) ⇒ Boolean

Returns:

  • (Boolean)

Raises:



350
351
352
353
354
355
# File 'lib/wavefront-cli/write.rb', line 350

def format_string_does_not_have_v_and_d?(fmt)
  return true unless fmt.include?('v') && fmt.include?('d')

  raise(WavefrontCli::Exception::UnparseableInput,
        "'v' and 'd' are mutually exclusive")
end

#format_string_has_big_t_only_at_the_end?(fmt) ⇒ Boolean

Returns:

  • (Boolean)

Raises:



371
372
373
374
375
376
377
# File 'lib/wavefront-cli/write.rb', line 371

def format_string_has_big_t_only_at_the_end?(fmt)
  return true unless fmt.include?('T')
  return true if fmt.end_with?('T')

  raise(WavefrontCli::Exception::UnparseableInput,
        "if used, 'T' must come at end of format string")
end

#format_string_has_unique_chars?(fmt) ⇒ Boolean

Returns:

  • (Boolean)

Raises:



364
365
366
367
368
369
# File 'lib/wavefront-cli/write.rb', line 364

def format_string_has_unique_chars?(fmt)
  return true if fmt.chars.sort == fmt.chars.uniq.sort

  raise(WavefrontCli::Exception::UnparseableInput,
        'repeated field in format string')
end

#format_string_has_v_or_d?(fmt) ⇒ Boolean

Returns:

  • (Boolean)

Raises:



343
344
345
346
347
348
# File 'lib/wavefront-cli/write.rb', line 343

def format_string_has_v_or_d?(fmt)
  return true if fmt.include?('v') || fmt.include?('d')

  raise(WavefrontCli::Exception::UnparseableInput,
        "format string must include 'v' or 'd'")
end

#format_string_is_all_valid_chars?(fmt) ⇒ Boolean

Returns:

  • (Boolean)

Raises:



357
358
359
360
361
362
# File 'lib/wavefront-cli/write.rb', line 357

def format_string_is_all_valid_chars?(fmt)
  return true if fmt =~ /^[dmstTv]+$/

  raise(WavefrontCli::Exception::UnparseableInput,
        'unsupported field in format string')
end

#line_tags(chunks) ⇒ Object

We can get tags from the file, from the -T option, or both. Merge them, making the -T win if there is a collision.



303
304
305
306
307
# File 'lib/wavefront-cli/write.rb', line 303

def line_tags(chunks)
  file_tags = fmt.last == 'T' ? extract_tags(chunks) : {}
  opt_tags = tags_to_hash(options[:tag]) || {}
  file_tags.merge(opt_tags)
end

#load_data(file) ⇒ Object



418
419
420
421
422
# File 'lib/wavefront-cli/write.rb', line 418

def load_data(file)
  IO.read(file)
rescue StandardError
  raise WavefrontCli::Exception::FileNotFound
end

#make_distribution_point(tags) ⇒ Object

rubocop:disable Metrics/AbcSize



42
43
44
45
46
47
48
49
50
# File 'lib/wavefront-cli/write.rb', line 42

def make_distribution_point(tags)
  { path: options[:'<metric>'],
    interval: options[:interval] || 'M',
    tags: tags,
    value: mk_dist }.tap do |p|
    p[:source] = options[:host] if options[:host]
    p[:ts] = parse_time(options[:time]) if options[:time]
  end
end

#mk_credsObject



82
83
84
85
86
87
88
# File 'lib/wavefront-cli/write.rb', line 82

def mk_creds
  { proxy: options[:proxy],
    port: options[:port] || default_port,
    socket: options[:socket],
    endpoint: options[:endpoint],
    token: options[:token] }
end

#mk_distObject

Turn our user’s representation of a distribution into one which suits Wavefront. The SDK can do this for us.



56
57
58
59
# File 'lib/wavefront-cli/write.rb', line 56

def mk_dist
  xpanded = expand_dist(options[:'<val>'])
  wf.mk_distribution(xpanded.map(&:to_f))
end

#open_connectionObject



115
116
117
# File 'lib/wavefront-cli/write.rb', line 115

def open_connection
  wf.open
end

#process_input(file) ⇒ Object

Read the input, from a file or from STDIN, and turn each line into Wavefront points.



132
133
134
135
136
137
138
139
140
# File 'lib/wavefront-cli/write.rb', line 132

def process_input(file)
  if file == '-'
    read_stdin
  else
    call_write(
      process_input_file(load_data(Pathname.new(file)).split("\n"))
    )
  end
end

#process_input_file(data) ⇒ Object

Parameters:



144
145
146
147
148
149
150
151
152
153
# File 'lib/wavefront-cli/write.rb', line 144

def process_input_file(data)
  data.each_with_object([]) do |l, a|
    begin
      a.<< process_line(l)
    rescue WavefrontCli::Exception::UnparseableInput => e
      puts "Bad input. #{e.message}."
      next
    end
  end
end

#process_line(line) ⇒ Hash

Process a line of input, as described by the format string held in @fmt. Produces a hash suitable for the SDK to send on.

We let the user define most of the fields, but anything beyond what they define is always assumed to be point tags. This is because you can have arbitrarily many of those for each point.

rubocop:disable Metrics/CyclomaticComplexity rubocop:disable Metrics/MethodLength rubocop:disable Metrics/AbcSize

Parameters:

  • line (String)

    a line of an input file

Returns:

  • (Hash)

Raises:

  • WavefrontCli::Exception::UnparseableInput if the line doesn’t look right



278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
# File 'lib/wavefront-cli/write.rb', line 278

def process_line(line)
  return true if line.empty?

  chunks = line.split(SPLIT_PATTERN, fmt.length)
  enough_fields?(line) # can raise exception

  point = { path: extract_path(chunks),
            value: extract_value(chunks) }

  tags = line_tags(chunks)

  point.tap do |p|
    p[:tags]     = tags unless tags.empty?
    p[:ts]       = extract_ts(chunks)        if fmt.include?('t')
    p[:source]   = extract_source(chunks)    if fmt.include?('s')
    p[:interval] = options[:interval] || 'm' if fmt.include?('d')
  end
end

#read_stdinObject

Read from standard in and stream points through an open socket. If the user hits ctrl-c, close the socket and exit politely.



170
171
172
173
174
175
176
177
178
# File 'lib/wavefront-cli/write.rb', line 170

def read_stdin
  open_connection
  STDIN.each_line { |l| call_write(process_line(l.strip), false) }
  close_connection
rescue SystemExit, Interrupt
  puts 'ctrl-c. Exiting.'
  wf.close
  exit 0
end

#send_point(point) ⇒ Object



123
124
125
126
127
# File 'lib/wavefront-cli/write.rb', line 123

def send_point(point)
  call_write(point)
rescue Wavefront::Exception::InvalidEndpoint
  abort format("Could not connect to proxy '%<proxy>s:%<port>s'.", options)
end

#setup_fmt(fmt) ⇒ Object



414
415
416
# File 'lib/wavefront-cli/write.rb', line 414

def setup_fmt(fmt)
  @fmt = fmt.split('')
end

#tags_to_hash(tags) ⇒ Hash

Takes an array of key=value tags (as produced by docopt) and turns it into a hash of key: value tags. Anything not of the form key=val is dropped. If key or value are quoted, we remove the quotes.

Parameters:

Returns:

  • (Hash)

    of k: v tags



317
318
319
320
321
322
323
324
325
# File 'lib/wavefront-cli/write.rb', line 317

def tags_to_hash(tags)
  return nil unless tags

  [tags].flatten.each_with_object({}) do |t, ret|
    k, v = t.split('=', 2)
    k.gsub!(/^["']|["']$/, '')
    ret[k.to_sym] = v.to_s.gsub(/^["']|["']$/, '') if v
  end
end

#valid_format?(fmt) ⇒ Boolean

The format string must contain values. They can be single values or distributions. So we must have ‘v’ xor ‘d’. It must not contain anything other than ‘m’, ‘t’, ‘T’, ‘s’, ‘d’, or ‘v’, and the ‘T’, if there, must be at the end. No letter must appear more than once.

Parameters:

  • fmt (String)

    format of input file

Returns:

  • (Boolean)


335
336
337
338
339
340
341
# File 'lib/wavefront-cli/write.rb', line 335

def valid_format?(fmt)
  format_string_has_v_or_d?(fmt)
  format_string_does_not_have_v_and_d?(fmt)
  format_string_is_all_valid_chars?(fmt)
  format_string_has_unique_chars?(fmt)
  format_string_has_big_t_only_at_the_end?(fmt)
end

#valid_timestamp?(timestamp) ⇒ Bool

Although the SDK does value checking, we’ll add another layer of input checking here. See if the time looks valid. We’ll assume anything before 2000/01/01 or after a year from now is wrong. Arbitrary, but there has to be a cut-off somewhere.

Parameters:

  • timestamp (String, Integer)

    epoch timestamp

Returns:

  • (Bool)


407
408
409
410
411
412
# File 'lib/wavefront-cli/write.rb', line 407

def valid_timestamp?(timestamp)
  (timestamp.is_a?(Integer) ||
    timestamp.is_a?(String) && timestamp.match(/^\d+$/)) &&
    timestamp.to_i > 946_684_800 &&
    timestamp.to_i < (Time.now.to_i + 31_557_600)
end

#validate_optsObject



94
95
96
97
98
99
100
101
102
103
104
105
106
# File 'lib/wavefront-cli/write.rb', line 94

def validate_opts
  validate_opts_file if options[:file]

  if options[:using] == 'unix'
    return true if options[:socket]

    raise(WavefrontCli::Exception::CredentialError, 'No socket path.')
  end

  return true if options[:proxy]

  raise(WavefrontCli::Exception::CredentialError, 'No proxy address.')
end

#validate_opts_fileObject



108
109
110
111
112
113
# File 'lib/wavefront-cli/write.rb', line 108

def validate_opts_file
  return true if options[:metric] || options[:infileformat]&.include?('m')

  raise(WavefrontCli::Exception::InsufficientData,
        "Supply a metric path in the file or with '-m'.")
end