Class: WavefrontCli::Write

Inherits:
Base
  • Object
show all
Includes:
Wavefront::Mixins
Defined in:
lib/wavefront-cli/write.rb

Overview

Send points via any method supported by the SDK

Constant Summary collapse

SPLIT_PATTERN =
/\s(?=(?:[^"]|"[^"]*")*$)/.freeze

Constants included from Constants

Constants::ALL_PAGE_SIZE, Constants::DEFAULT_CONFIG, Constants::DEFAULT_OPTS, Constants::EVENT_STATE_DIR, Constants::HUMAN_TIME_FORMAT, Constants::HUMAN_TIME_FORMAT_MS, Constants::SEARCH_SPLIT

Instance Attribute Summary collapse

Attributes inherited from Base

#klass, #klass_word, #options, #wf

Instance Method Summary collapse

Methods inherited from Base

#cannot_noop!, #check_response_blocks, #check_status, #cli_output_class, #conds_to_query, #descriptive_name, #dispatch, #display, #display_api_error, #display_class, #display_no_api_response, #do_delete, #do_describe, #do_dump, #do_import, #do_list, #do_search, #do_set, #do_undelete, #dump_json, #dump_yaml, #extract_values, #failed_validation_message, #format_var, #handle_error, #handle_response, #hcl_fields, #import_to_create, #initialize, #item_dump_call, #load_display_class, #matching_method, #method_word_list, #mk_opts, #name_of_do_method, #no_api_response, #ok_exit, #one_or_all, #options_and_exit, #parseable_output, #range_hash, #require_sdk_class, #run, #search_key, #smart_delete, #smart_delete_message, #status_error_handler, #unsupported_format_message, #validate_id, #validate_input, #validate_tags, #validator_exception, #validator_method, #warning_message

Constructor Details

This class inherits a constructor from WavefrontCli::Base

Instance Attribute Details

#fmtObject (readonly)

Returns the value of attribute fmt.



11
12
13
# File 'lib/wavefront-cli/write.rb', line 11

def fmt
  @fmt
end

Instance Method Details

#_sdk_classObject

I chose to prioritise UI consistency over internal elegance here. The write command doesn’t follow the age-old assumption that each command maps 1:1 to a similarly named SDK class. Write can use write or distribution.



68
69
70
71
72
# File 'lib/wavefront-cli/write.rb', line 68

def _sdk_class
  return 'Wavefront::Distribution' if distribution?

  'Wavefront::Write'
end

#call_write(data, openclose = true) ⇒ Object

A wrapper which lets us send normal points, deltas, or distributions



156
157
158
159
160
161
162
# File 'lib/wavefront-cli/write.rb', line 156

def call_write(data, openclose = true)
  if options[:delta]
    wf.write_delta(data, openclose)
  else
    wf.write(data, openclose)
  end
end

#close_connectionObject



117
118
119
# File 'lib/wavefront-cli/write.rb', line 117

def close_connection
  wf.close
end

#default_portObject



88
89
90
# File 'lib/wavefront-cli/write.rb', line 88

def default_port
  distribution? ? 40_000 : 2878
end

#distribution?Boolean

Returns:

  • (Boolean)


74
75
76
77
78
# File 'lib/wavefront-cli/write.rb', line 74

def distribution?
  return true if options[:distribution]

  options[:infileformat]&.include?('d')
end

#do_distributionObject



35
36
37
# File 'lib/wavefront-cli/write.rb', line 35

def do_distribution
  send_point(make_distribution_point(tags_to_hash(options[:tag])))
end

#do_fileObject

rubocop:enable Metrics/AbcSize



29
30
31
32
33
# File 'lib/wavefront-cli/write.rb', line 29

def do_file
  valid_format?(options[:infileformat])
  setup_fmt(options[:infileformat] || 'tmv')
  process_input(options[:'<file>'])
end

#do_pointObject

rubocop:disable Metrics/AbcSize



16
17
18
19
20
21
22
23
24
25
26
# File 'lib/wavefront-cli/write.rb', line 16

def do_point
  tags = tags_to_hash(options[:tag])

  p = { path: options[:'<metric>'],
        value: options[:'<value>'].delete('\\').to_f }

  p[:tags] = tags unless tags.empty?
  p[:source] = options[:host] if options[:host]
  p[:ts] = parse_time(options[:time]) if options[:time]
  send_point(p)
end

#enough_fields?(line) ⇒ True

Make sure we have the right number of columns, according to the format string. We want to take every precaution we can to stop users accidentally polluting their metric namespace with junk.

Parameters:

  • line (String)

    input line

Returns:

  • (True)

    if the number of fields is correct

Raises:

  • WavefrontCli::Exception::UnparseableInput if there are not the right number of fields.



387
388
389
390
391
392
393
394
395
396
# File 'lib/wavefront-cli/write.rb', line 387

def enough_fields?(line)
  ncols = line.split(SPLIT_PATTERN).length
  return true if fmt.include?('T') && ncols >= fmt.length
  return true if ncols == fmt.length

  raise(WavefrontCli::Exception::UnparseableInput,
        format('Expected %<expected>s fields, got %<got>s',
               expected: fmt.length,
               got: ncols))
end

#expand_dist(dist) ⇒ Object

We will let users write a distribution as ‘1 1 1’ or ‘3x1’ or even a mix of the two



199
200
201
202
203
204
205
206
207
208
# File 'lib/wavefront-cli/write.rb', line 199

def expand_dist(dist)
  dist.map do |v|
    if v.is_a?(String) && v.include?('x')
      x, val = v.split('x', 2)
      Array.new(x.to_i, val.to_f)
    else
      v.to_f
    end
  end.flatten
end

#extra_optionsObject



59
60
61
# File 'lib/wavefront-cli/write.rb', line 59

def extra_options
  options[:using] ? { writer: options[:using] } : {}
end

#extract_path(chunks) ⇒ Object

Find and return the metric path in a chunked line of input. The path can be in the data, or passed as an option, or both. If the latter, then we assume the option is a prefix, and concatenate the value in the data.

param chunks [Array] a chunked line of input from #process_line return [String] the metric path raise TypeError if field does not exist



240
241
242
243
244
245
246
247
# File 'lib/wavefront-cli/write.rb', line 240

def extract_path(chunks)
  m = chunks[fmt.index('m')]
  options[:metric] ? [options[:metric], m].join('.') : m
rescue TypeError
  return options[:metric] if options[:metric]

  raise
end

#extract_source(chunks) ⇒ Object

Find and return the source in a chunked line of input.

param chunks [Array] a chunked line of input from #process_line return [String] the source, if it is there, or if not, the

value passed through by -H, or the local hostname.


255
256
257
258
259
# File 'lib/wavefront-cli/write.rb', line 255

def extract_source(chunks)
  chunks[fmt.index('s')]
rescue TypeError
  options[:source] || Socket.gethostname
end

#extract_tags(chunks) ⇒ Hash

Returns of k = v tags.

Parameters:

  • chunks (Array)

    an input line broken into tokens. The final token will be a space-separated list of point tags.

Returns:

  • (Hash)

    of k = v tags.



227
228
229
# File 'lib/wavefront-cli/write.rb', line 227

def extract_tags(chunks)
  tags_to_hash(chunks.last.split(SPLIT_PATTERN))
end

#extract_ts(chunks) ⇒ Float

Find and return the source in a chunked line of input.

Parameters:

  • chunks (Array)

    a chunked line of input from #process_line

Returns:

  • (Float)

    the timestamp, if it is there, or the current UTC time if it is not.



216
217
218
219
220
221
# File 'lib/wavefront-cli/write.rb', line 216

def extract_ts(chunks)
  ts = chunks[fmt.index('t')]
  return parse_time(ts) if valid_timestamp?(ts)
rescue TypeError
  Time.now.utc.to_i
end

#extract_value(chunks) ⇒ Object

Find and return the value in a chunked line of input

param chunks [Array] a chunked line of input from #process_line return [Float] the value raise TypeError if field does not exist raise Wavefront::Exception::InvalidValue if it’s not a value



185
186
187
188
189
190
191
192
193
194
# File 'lib/wavefront-cli/write.rb', line 185

def extract_value(chunks)
  if fmt.include?('v')
    v = chunks[fmt.index('v')]
    v.to_f
  else
    raw = chunks[fmt.index('d')].split(',')
    xpanded = expand_dist(raw)
    wf.mk_distribution(xpanded)
  end
end

#format_string_does_not_have_v_and_d?(fmt) ⇒ Boolean

Returns:

  • (Boolean)

Raises:



348
349
350
351
352
353
# File 'lib/wavefront-cli/write.rb', line 348

def format_string_does_not_have_v_and_d?(fmt)
  return true unless fmt.include?('v') && fmt.include?('d')

  raise(WavefrontCli::Exception::UnparseableInput,
        "'v' and 'd' are mutually exclusive")
end

#format_string_has_big_t_only_at_the_end?(fmt) ⇒ Boolean

Returns:

  • (Boolean)

Raises:



369
370
371
372
373
374
375
# File 'lib/wavefront-cli/write.rb', line 369

def format_string_has_big_t_only_at_the_end?(fmt)
  return true unless fmt.include?('T')
  return true if fmt.end_with?('T')

  raise(WavefrontCli::Exception::UnparseableInput,
        "if used, 'T' must come at end of format string")
end

#format_string_has_unique_chars?(fmt) ⇒ Boolean

Returns:

  • (Boolean)

Raises:



362
363
364
365
366
367
# File 'lib/wavefront-cli/write.rb', line 362

def format_string_has_unique_chars?(fmt)
  return true if fmt.chars.sort == fmt.chars.uniq.sort

  raise(WavefrontCli::Exception::UnparseableInput,
        'repeated field in format string')
end

#format_string_has_v_or_d?(fmt) ⇒ Boolean

Returns:

  • (Boolean)

Raises:



341
342
343
344
345
346
# File 'lib/wavefront-cli/write.rb', line 341

def format_string_has_v_or_d?(fmt)
  return true if fmt.include?('v') || fmt.include?('d')

  raise(WavefrontCli::Exception::UnparseableInput,
        "format string must include 'v' or 'd'")
end

#format_string_is_all_valid_chars?(fmt) ⇒ Boolean

Returns:

  • (Boolean)

Raises:



355
356
357
358
359
360
# File 'lib/wavefront-cli/write.rb', line 355

def format_string_is_all_valid_chars?(fmt)
  return true if fmt =~ /^[dmstTv]+$/

  raise(WavefrontCli::Exception::UnparseableInput,
        'unsupported field in format string')
end

#line_tags(chunks) ⇒ Object

We can get tags from the file, from the -T option, or both. Merge them, making the -T win if there is a collision.



301
302
303
304
305
# File 'lib/wavefront-cli/write.rb', line 301

def line_tags(chunks)
  file_tags = fmt.last == 'T' ? extract_tags(chunks) : {}
  opt_tags = tags_to_hash(options[:tag]) || {}
  file_tags.merge(opt_tags)
end

#load_data(file) ⇒ Object



416
417
418
419
420
# File 'lib/wavefront-cli/write.rb', line 416

def load_data(file)
  IO.read(file)
rescue StandardError
  raise WavefrontCli::Exception::FileNotFound
end

#make_distribution_point(tags) ⇒ Object

rubocop:disable Metrics/AbcSize



40
41
42
43
44
45
46
47
48
# File 'lib/wavefront-cli/write.rb', line 40

def make_distribution_point(tags)
  { path: options[:'<metric>'],
    interval: options[:interval] || 'M',
    tags: tags,
    value: mk_dist }.tap do |p|
    p[:source] = options[:host] if options[:host]
    p[:ts] = parse_time(options[:time]) if options[:time]
  end
end

#mk_credsObject



80
81
82
83
84
85
86
# File 'lib/wavefront-cli/write.rb', line 80

def mk_creds
  { proxy: options[:proxy],
    port: options[:port] || default_port,
    socket: options[:socket],
    endpoint: options[:endpoint],
    token: options[:token] }
end

#mk_distObject

Turn our user’s representation of a distribution into one which suits Wavefront. The SDK can do this for us.



54
55
56
57
# File 'lib/wavefront-cli/write.rb', line 54

def mk_dist
  xpanded = expand_dist(options[:'<val>'])
  wf.mk_distribution(xpanded.map(&:to_f))
end

#open_connectionObject



113
114
115
# File 'lib/wavefront-cli/write.rb', line 113

def open_connection
  wf.open
end

#process_input(file) ⇒ Object

Read the input, from a file or from STDIN, and turn each line into Wavefront points.



130
131
132
133
134
135
136
137
138
# File 'lib/wavefront-cli/write.rb', line 130

def process_input(file)
  if file == '-'
    read_stdin
  else
    call_write(
      process_input_file(load_data(Pathname.new(file)).split("\n"))
    )
  end
end

#process_input_file(data) ⇒ Object

Parameters:



142
143
144
145
146
147
148
149
150
151
# File 'lib/wavefront-cli/write.rb', line 142

def process_input_file(data)
  data.each_with_object([]) do |l, a|
    begin
      a.<< process_line(l)
    rescue WavefrontCli::Exception::UnparseableInput => e
      puts "Bad input. #{e.message}."
      next
    end
  end
end

#process_line(line) ⇒ Hash

Process a line of input, as described by the format string held in @fmt. Produces a hash suitable for the SDK to send on.

We let the user define most of the fields, but anything beyond what they define is always assumed to be point tags. This is because you can have arbitrarily many of those for each point.

rubocop:disable Metrics/CyclomaticComplexity rubocop:disable Metrics/MethodLength rubocop:disable Metrics/AbcSize

Parameters:

  • line (String)

    a line of an input file

Returns:

  • (Hash)

Raises:

  • WavefrontCli::Exception::UnparseableInput if the line doesn’t look right



276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
# File 'lib/wavefront-cli/write.rb', line 276

def process_line(line)
  return true if line.empty?

  chunks = line.split(SPLIT_PATTERN, fmt.length)
  enough_fields?(line) # can raise exception

  point = { path: extract_path(chunks),
            value: extract_value(chunks) }

  tags = line_tags(chunks)

  point.tap do |p|
    p[:tags]     = tags unless tags.empty?
    p[:ts]       = extract_ts(chunks)        if fmt.include?('t')
    p[:source]   = extract_source(chunks)    if fmt.include?('s')
    p[:interval] = options[:interval] || 'm' if fmt.include?('d')
  end
end

#read_stdinObject

Read from standard in and stream points through an open socket. If the user hits ctrl-c, close the socket and exit politely.



168
169
170
171
172
173
174
175
176
# File 'lib/wavefront-cli/write.rb', line 168

def read_stdin
  open_connection
  STDIN.each_line { |l| call_write(process_line(l.strip), false) }
  close_connection
rescue SystemExit, Interrupt
  puts 'ctrl-c. Exiting.'
  wf.close
  exit 0
end

#send_point(point) ⇒ Object



121
122
123
124
125
# File 'lib/wavefront-cli/write.rb', line 121

def send_point(point)
  call_write(point)
rescue Wavefront::Exception::InvalidEndpoint
  abort format("Could not connect to proxy '%<proxy>s:%<port>s'.", options)
end

#setup_fmt(fmt) ⇒ Object



412
413
414
# File 'lib/wavefront-cli/write.rb', line 412

def setup_fmt(fmt)
  @fmt = fmt.split('')
end

#tags_to_hash(tags) ⇒ Hash

Takes an array of key=value tags (as produced by docopt) and turns it into a hash of key: value tags. Anything not of the form key=val is dropped. If key or value are quoted, we remove the quotes.

Parameters:

Returns:

  • (Hash)

    of k: v tags



315
316
317
318
319
320
321
322
323
# File 'lib/wavefront-cli/write.rb', line 315

def tags_to_hash(tags)
  return nil unless tags

  [tags].flatten.each_with_object({}) do |t, ret|
    k, v = t.split('=', 2)
    k.gsub!(/^["']|["']$/, '')
    ret[k.to_sym] = v.to_s.gsub(/^["']|["']$/, '') if v
  end
end

#valid_format?(fmt) ⇒ Boolean

The format string must contain values. They can be single values or distributions. So we must have ‘v’ xor ‘d’. It must not contain anything other than ‘m’, ‘t’, ‘T’, ‘s’, ‘d’, or ‘v’, and the ‘T’, if there, must be at the end. No letter must appear more than once.

Parameters:

  • fmt (String)

    format of input file

Returns:

  • (Boolean)


333
334
335
336
337
338
339
# File 'lib/wavefront-cli/write.rb', line 333

def valid_format?(fmt)
  format_string_has_v_or_d?(fmt)
  format_string_does_not_have_v_and_d?(fmt)
  format_string_is_all_valid_chars?(fmt)
  format_string_has_unique_chars?(fmt)
  format_string_has_big_t_only_at_the_end?(fmt)
end

#valid_timestamp?(timestamp) ⇒ Bool

Although the SDK does value checking, we’ll add another layer of input checking here. See if the time looks valid. We’ll assume anything before 2000/01/01 or after a year from now is wrong. Arbitrary, but there has to be a cut-off somewhere.

Parameters:

  • timestamp (String, Integer)

    epoch timestamp

Returns:

  • (Bool)


405
406
407
408
409
410
# File 'lib/wavefront-cli/write.rb', line 405

def valid_timestamp?(timestamp)
  (timestamp.is_a?(Integer) ||
    timestamp.is_a?(String) && timestamp.match(/^\d+$/)) &&
    timestamp.to_i > 946_684_800 &&
    timestamp.to_i < (Time.now.to_i + 31_557_600)
end

#validate_optsObject



92
93
94
95
96
97
98
99
100
101
102
103
104
# File 'lib/wavefront-cli/write.rb', line 92

def validate_opts
  validate_opts_file if options[:file]

  if options[:using] == 'unix'
    return true if options[:socket]

    raise(WavefrontCli::Exception::CredentialError, 'No socket path.')
  end

  return true if options[:proxy]

  raise(WavefrontCli::Exception::CredentialError, 'No proxy address.')
end

#validate_opts_fileObject



106
107
108
109
110
111
# File 'lib/wavefront-cli/write.rb', line 106

def validate_opts_file
  return true if options[:metric] || options[:infileformat]&.include?('m')

  raise(WavefrontCli::Exception::InsufficientData,
        "Supply a metric path in the file or with '-m'.")
end