Class: WavefrontCli::Write

Inherits:
Base
  • Object
show all
Includes:
Wavefront::Mixins
Defined in:
lib/wavefront-cli/write.rb

Overview

Send points via any method supported by the SDK

Constant Summary collapse

SPLIT_PATTERN =
/\s(?=(?:[^"]|"[^"]*")*$)/

Constants included from Constants

Constants::ALL_PAGE_SIZE, Constants::DEFAULT_CONFIG, Constants::DEFAULT_OPTS, Constants::EVENT_STATE_DIR, Constants::HUMAN_TIME_FORMAT, Constants::HUMAN_TIME_FORMAT_MS, Constants::SEARCH_SPLIT

Instance Attribute Summary collapse

Attributes inherited from Base

#klass, #klass_word, #options, #wf

Instance Method Summary collapse

Methods inherited from Base

#cannot_noop!, #check_response_blocks, #check_status, #cli_output_class, #conds_to_query, #descriptive_name, #dispatch, #display, #display_api_error, #display_class, #display_no_api_response, #do_delete, #do_describe, #do_dump, #do_import, #do_list, #do_search, #do_set, #do_undelete, #dump_json, #dump_yaml, #extract_values, #failed_validation_message, #format_var, #handle_error, #handle_response, #hcl_fields, #import_to_create, #initialize, #item_dump_call, #load_display_class, #matching_method, #method_word_list, #mk_opts, #name_of_do_method, #no_api_response, #ok_exit, #one_or_all, #options_and_exit, #parseable_output, #range_hash, #require_sdk_class, #run, #search_key, #smart_delete, #smart_delete_message, #status_error_handler, #unsupported_format_message, #validate_id, #validate_input, #validate_tags, #validator_exception, #validator_method, #warning_message

Constructor Details

This class inherits a constructor from WavefrontCli::Base

Instance Attribute Details

#fmtObject (readonly)

Returns the value of attribute fmt.



11
12
13
# File 'lib/wavefront-cli/write.rb', line 11

def fmt
  @fmt
end

Instance Method Details

#_sdk_classObject

I chose to prioritise UI consistency over internal elegance here. The ‘write` command doesn’t follow the age-old assumption that each command maps 1:1 to a similarly named SDK class. Write can use ‘write` or `distribution`.



98
99
100
101
102
# File 'lib/wavefront-cli/write.rb', line 98

def _sdk_class
  return 'Wavefront::Distribution' if distribution?

  'Wavefront::Write'
end

#call_write(data, openclose = true) ⇒ Object

A wrapper which lets us send normal points, deltas, or distributions



176
177
178
179
180
181
182
# File 'lib/wavefront-cli/write.rb', line 176

def call_write(data, openclose = true)
  if options[:delta]
    wf.write_delta(data, openclose)
  else
    wf.write(data, openclose)
  end
end

#close_connectionObject



139
140
141
# File 'lib/wavefront-cli/write.rb', line 139

def close_connection
  wf.close
end

#default_portObject



119
120
121
# File 'lib/wavefront-cli/write.rb', line 119

def default_port
  distribution? ? 40_000 : 2878
end

#distribution?Boolean

Returns:

  • (Boolean)


104
105
106
107
108
# File 'lib/wavefront-cli/write.rb', line 104

def distribution?
  return true if options[:distribution]

  options[:infileformat]&.include?('d')
end

#do_distributionObject



36
37
38
# File 'lib/wavefront-cli/write.rb', line 36

def do_distribution
  send_point(make_distribution_point(tags_to_hash(options[:tag])))
end

#do_fileObject

rubocop:enable Metrics/AbcSize



30
31
32
33
34
# File 'lib/wavefront-cli/write.rb', line 30

def do_file
  valid_format?(options[:infileformat])
  setup_fmt(options[:infileformat] || 'tmv')
  process_input(options[:'<file>'])
end

#do_noiseObject



40
41
42
43
44
45
# File 'lib/wavefront-cli/write.rb', line 40

def do_noise
  loop do
    do_point(random_value(options[:min] || -10, options[:max] || 10))
    sleep(sleep_time)
  end
end

#do_point(value = options[:'<value>']) ⇒ Object

rubocop:disable Metrics/AbcSize



17
18
19
20
21
22
23
24
25
26
27
# File 'lib/wavefront-cli/write.rb', line 17

def do_point(value = options[:'<value>'])
  tags = tags_to_hash(options[:tag])

  p = { path: options[:'<metric>'],
        value: sane_value(value) }

  p[:tags] = tags unless tags.empty?
  p[:source] = options[:host] if options[:host]
  p[:ts] = parse_time(options[:time]) if options[:time]
  send_point(p)
end

#enough_fields?(line) ⇒ True

Make sure we have the right number of columns, according to the format string. We want to take every precaution we can to stop users accidentally polluting their metric namespace with junk.

Parameters:

  • line (String)

    input line

Returns:

  • (True)

    if the number of fields is correct

Raises:

  • WavefrontCli::Exception::UnparseableInput if there are not the right number of fields.



408
409
410
411
412
413
414
415
416
417
# File 'lib/wavefront-cli/write.rb', line 408

def enough_fields?(line)
  ncols = line.split(SPLIT_PATTERN).length
  return true if fmt.include?('T') && ncols >= fmt.length
  return true if ncols == fmt.length

  raise(WavefrontCli::Exception::UnparseableInput,
        format('Expected %<expected>s fields, got %<got>s',
               expected: fmt.length,
               got: ncols))
end

#expand_dist(dist) ⇒ Object

We will let users write a distribution as ‘1 1 1’ or ‘3x1’ or even a mix of the two



222
223
224
225
226
227
228
229
230
231
# File 'lib/wavefront-cli/write.rb', line 222

def expand_dist(dist)
  dist.map do |v|
    if v.is_a?(String) && v.include?('x')
      x, val = v.split('x', 2)
      Array.new(x.to_i, val.to_f)
    else
      v.to_f
    end
  end.flatten
end

#extra_optionsObject



85
86
87
88
89
90
91
# File 'lib/wavefront-cli/write.rb', line 85

def extra_options
  if options[:using]
    { writer: options[:using] }
  else
    { writer: :http }
  end
end

#extract_path(chunks) ⇒ Object

Find and return the metric path in a chunked line of input. The path can be in the data, or passed as an option, or both. If the latter, then we assume the option is a prefix, and concatenate the value in the data.

param chunks [Array] a chunked line of input from #process_line return [String] the metric path raise TypeError if field does not exist



263
264
265
266
267
268
269
270
# File 'lib/wavefront-cli/write.rb', line 263

def extract_path(chunks)
  m = chunks[fmt.index('m')]
  options[:metric] ? [options[:metric], m].join('.') : m
rescue TypeError
  return options[:metric] if options[:metric]

  raise
end

#extract_source(chunks) ⇒ Object

Find and return the source in a chunked line of input.

param chunks [Array] a chunked line of input from #process_line return [String] the source, if it is there, or if not, the

value passed through by -H, or the local hostname.


278
279
280
281
282
# File 'lib/wavefront-cli/write.rb', line 278

def extract_source(chunks)
  chunks[fmt.index('s')]
rescue TypeError
  options[:source] || Socket.gethostname
end

#extract_tags(chunks) ⇒ Hash

Returns of k = v tags.

Parameters:

  • chunks (Array)

    an input line broken into tokens. The final token will be a space-separated list of point tags.

Returns:

  • (Hash)

    of k = v tags.



250
251
252
# File 'lib/wavefront-cli/write.rb', line 250

def extract_tags(chunks)
  tags_to_hash(chunks.last.split(SPLIT_PATTERN))
end

#extract_ts(chunks) ⇒ Float

Find and return the source in a chunked line of input.

Parameters:

  • chunks (Array)

    a chunked line of input from #process_line

Returns:

  • (Float)

    the timestamp, if it is there, or the current UTC time if it is not.



239
240
241
242
243
244
# File 'lib/wavefront-cli/write.rb', line 239

def extract_ts(chunks)
  ts = chunks[fmt.index('t')]
  parse_time(ts) if valid_timestamp?(ts)
rescue TypeError
  Time.now.utc.to_i
end

#extract_value(chunks) ⇒ Object

Find and return the value in a chunked line of input

param chunks [Array] a chunked line of input from #process_line return [Float] the value raise TypeError if field does not exist raise Wavefront::Exception::InvalidValue if it’s not a value



208
209
210
211
212
213
214
215
216
217
# File 'lib/wavefront-cli/write.rb', line 208

def extract_value(chunks)
  if fmt.include?('v')
    v = chunks[fmt.index('v')]
    v.to_f
  else
    raw = chunks[fmt.index('d')].split(',')
    xpanded = expand_dist(raw)
    wf.mk_distribution(xpanded)
  end
end

#format_string_does_not_have_v_and_d?(fmt) ⇒ Boolean

Returns:

  • (Boolean)

Raises:



369
370
371
372
373
374
# File 'lib/wavefront-cli/write.rb', line 369

def format_string_does_not_have_v_and_d?(fmt)
  return true unless fmt.include?('v') && fmt.include?('d')

  raise(WavefrontCli::Exception::UnparseableInput,
        "'v' and 'd' are mutually exclusive")
end

#format_string_has_big_t_only_at_the_end?(fmt) ⇒ Boolean

Returns:

  • (Boolean)

Raises:



390
391
392
393
394
395
396
# File 'lib/wavefront-cli/write.rb', line 390

def format_string_has_big_t_only_at_the_end?(fmt)
  return true unless fmt.include?('T')
  return true if fmt.end_with?('T')

  raise(WavefrontCli::Exception::UnparseableInput,
        "if used, 'T' must come at end of format string")
end

#format_string_has_unique_chars?(fmt) ⇒ Boolean

Returns:

  • (Boolean)

Raises:



383
384
385
386
387
388
# File 'lib/wavefront-cli/write.rb', line 383

def format_string_has_unique_chars?(fmt)
  return true if fmt.chars.sort == fmt.chars.uniq.sort

  raise(WavefrontCli::Exception::UnparseableInput,
        'repeated field in format string')
end

#format_string_has_v_or_d?(fmt) ⇒ Boolean

Returns:

  • (Boolean)

Raises:



362
363
364
365
366
367
# File 'lib/wavefront-cli/write.rb', line 362

def format_string_has_v_or_d?(fmt)
  return true if fmt.include?('v') || fmt.include?('d')

  raise(WavefrontCli::Exception::UnparseableInput,
        "format string must include 'v' or 'd'")
end

#format_string_is_all_valid_chars?(fmt) ⇒ Boolean

Returns:

  • (Boolean)

Raises:



376
377
378
379
380
381
# File 'lib/wavefront-cli/write.rb', line 376

def format_string_is_all_valid_chars?(fmt)
  return true if /^[dmstTv]+$/.match?(fmt)

  raise(WavefrontCli::Exception::UnparseableInput,
        'unsupported field in format string')
end

#line_tags(chunks) ⇒ Object

We can get tags from the file, from the -T option, or both. Merge them, making the -T win if there is a collision.



322
323
324
325
326
# File 'lib/wavefront-cli/write.rb', line 322

def line_tags(chunks)
  file_tags = fmt.last == 'T' ? extract_tags(chunks) : {}
  opt_tags = tags_to_hash(options[:tag]) || {}
  file_tags.merge(opt_tags)
end

#load_data(file) ⇒ Object



437
438
439
440
441
# File 'lib/wavefront-cli/write.rb', line 437

def load_data(file)
  File.read(file)
rescue StandardError
  raise WavefrontCli::Exception::FileNotFound
end

#make_distribution_point(tags) ⇒ Object

rubocop:disable Metrics/AbcSize



66
67
68
69
70
71
72
73
74
# File 'lib/wavefront-cli/write.rb', line 66

def make_distribution_point(tags)
  { path: options[:'<metric>'],
    interval: options[:interval] || 'M',
    tags: tags,
    value: mk_dist }.tap do |p|
    p[:source] = options[:host] if options[:host]
    p[:ts] = parse_time(options[:time]) if options[:time]
  end
end

#mk_credsObject



110
111
112
113
114
115
116
117
# File 'lib/wavefront-cli/write.rb', line 110

def mk_creds
  { proxy: options[:proxy],
    port: options[:port] || default_port,
    socket: options[:socket],
    endpoint: options[:endpoint],
    agent: "wavefront-cli-#{WF_CLI_VERSION}",
    token: options[:token] }
end

#mk_distObject

Turn our user’s representation of a distribution into one which suits Wavefront. The SDK can do this for us.



80
81
82
83
# File 'lib/wavefront-cli/write.rb', line 80

def mk_dist
  xpanded = expand_dist(options[:'<val>'])
  wf.mk_distribution(xpanded.map(&:to_f))
end

#open_connectionObject



135
136
137
# File 'lib/wavefront-cli/write.rb', line 135

def open_connection
  wf.open
end

#process_input(file) ⇒ Object

Read the input, from a file or from STDIN, and turn each line into Wavefront points.



152
153
154
155
156
157
158
159
160
# File 'lib/wavefront-cli/write.rb', line 152

def process_input(file)
  if file == '-'
    read_stdin
  else
    call_write(
      process_input_file(load_data(Pathname.new(file)).split("\n"))
    )
  end
end

#process_input_file(data) ⇒ Object

Parameters:



164
165
166
167
168
169
170
171
# File 'lib/wavefront-cli/write.rb', line 164

def process_input_file(data)
  data.each_with_object([]) do |l, a|
    a << process_line(l)
  rescue WavefrontCli::Exception::UnparseableInput => e
    puts "Bad input. #{e.message}."
    next
  end
end

#process_line(line) ⇒ Hash

Process a line of input, as described by the format string held in @fmt. Produces a hash suitable for the SDK to send on.

We let the user define most of the fields, but anything beyond what they define is always assumed to be point tags. This is because you can have arbitrarily many of those for each point.

rubocop:disable Metrics/MethodLength rubocop:disable Metrics/AbcSize

Parameters:

  • line (String)

    a line of an input file

Returns:

  • (Hash)

Raises:

  • WavefrontCli::Exception::UnparseableInput if the line doesn’t look right



298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
# File 'lib/wavefront-cli/write.rb', line 298

def process_line(line)
  return true if line.empty?

  chunks = line.split(SPLIT_PATTERN, fmt.length)
  enough_fields?(line) # can raise exception

  point = { path: extract_path(chunks),
            value: extract_value(chunks) }

  tags = line_tags(chunks)

  point.tap do |p|
    p[:tags]     = tags unless tags.empty?
    p[:ts]       = extract_ts(chunks)        if fmt.include?('t')
    p[:source]   = extract_source(chunks)    if fmt.include?('s')
    p[:interval] = options[:interval] || 'm' if fmt.include?('d')
  end
end

#random_value(min, max) ⇒ Object



47
48
49
50
51
# File 'lib/wavefront-cli/write.rb', line 47

def random_value(min, max)
  return min if min == max

  rand(max.to_f - min.to_f) + min.to_f
end

#read_stdinObject

Read from standard in and stream points through an open socket. If the user hits ctrl-c, close the socket and exit politely.



188
189
190
191
192
193
194
195
196
197
198
199
# File 'lib/wavefront-cli/write.rb', line 188

def read_stdin
  open_connection
  ret = $stdin.each_line.map do |l|
    call_write(process_line(l.strip), false)
  end
  close_connection
  ret.last
rescue SystemExit, Interrupt
  puts 'ctrl-c. Exiting.'
  wf.close
  exit 0
end

#sane_value(value) ⇒ Object



53
54
55
56
57
58
59
# File 'lib/wavefront-cli/write.rb', line 53

def sane_value(value)
  return value if value.is_a?(Numeric)

  raise WavefrontCli::Exception::InvalidValue unless value.is_a?(String)

  value.delete('\\').to_f
end

#send_point(point) ⇒ Object



143
144
145
146
147
# File 'lib/wavefront-cli/write.rb', line 143

def send_point(point)
  call_write(point)
rescue Wavefront::Exception::InvalidEndpoint
  abort format("Could not connect to proxy '%<proxy>s:%<port>s'.", wf.creds)
end

#setup_fmt(fmt) ⇒ Object



433
434
435
# File 'lib/wavefront-cli/write.rb', line 433

def setup_fmt(fmt)
  @fmt = fmt.chars
end

#sleep_timeObject



61
62
63
# File 'lib/wavefront-cli/write.rb', line 61

def sleep_time
  options[:interval] ? options[:interval].to_f : 1
end

#tags_to_hash(tags) ⇒ Hash

Takes an array of key=value tags (as produced by docopt) and turns it into a hash of key: value tags. Anything not of the form key=val is dropped. If key or value are quoted, we remove the quotes.

Parameters:

Returns:

  • (Hash)

    of k: v tags



336
337
338
339
340
341
342
343
344
# File 'lib/wavefront-cli/write.rb', line 336

def tags_to_hash(tags)
  return nil unless tags

  [tags].flatten.each_with_object({}) do |t, ret|
    k, v = t.split('=', 2)
    k.gsub!(/^["']|["']$/, '')
    ret[k.to_sym] = v.to_s.gsub(/^["']|["']$/, '') if v
  end
end

#valid_format?(fmt) ⇒ Boolean

The format string must contain values. They can be single values or distributions. So we must have ‘v’ xor ‘d’. It must not contain anything other than ‘m’, ‘t’, ‘T’, ‘s’, ‘d’, or ‘v’, and the ‘T’, if there, must be at the end. No letter must appear more than once.

Parameters:

  • fmt (String)

    format of input file

Returns:

  • (Boolean)


354
355
356
357
358
359
360
# File 'lib/wavefront-cli/write.rb', line 354

def valid_format?(fmt)
  format_string_has_v_or_d?(fmt)
  format_string_does_not_have_v_and_d?(fmt)
  format_string_is_all_valid_chars?(fmt)
  format_string_has_unique_chars?(fmt)
  format_string_has_big_t_only_at_the_end?(fmt)
end

#valid_timestamp?(timestamp) ⇒ Bool

Although the SDK does value checking, we’ll add another layer of input checking here. See if the time looks valid. We’ll assume anything before 2000/01/01 or after a year from now is wrong. Arbitrary, but there has to be a cut-off somewhere.

Parameters:

  • timestamp (String, Integer)

    epoch timestamp

Returns:

  • (Bool)


426
427
428
429
430
431
# File 'lib/wavefront-cli/write.rb', line 426

def valid_timestamp?(timestamp)
  (timestamp.is_a?(Integer) ||
    (timestamp.is_a?(String) && timestamp.match(/^\d+$/))) &&
    timestamp.to_i > 946_684_800 &&
    timestamp.to_i < (Time.now.to_i + 31_557_600)
end

#validate_optsObject

The SDK writer plugins validate the credentials they need



124
125
126
# File 'lib/wavefront-cli/write.rb', line 124

def validate_opts
  validate_opts_file if options[:file]
end

#validate_opts_fileObject



128
129
130
131
132
133
# File 'lib/wavefront-cli/write.rb', line 128

def validate_opts_file
  return true if options[:metric] || options[:infileformat]&.include?('m')

  raise(WavefrontCli::Exception::InsufficientData,
        "Supply a metric path in the file or with '-m'.")
end