Class: WavefrontCli::Write

Inherits:
Base
  • Object
show all
Includes:
Wavefront::Mixins
Defined in:
lib/wavefront-cli/write.rb

Overview

Send points to a proxy.

rubocop:disable Metrics/ClassLength

Instance Attribute Summary collapse

Attributes inherited from Base

#klass, #klass_word, #options, #wf

Instance Method Summary collapse

Methods inherited from Base

#check_status, #dispatch, #display, #do_delete, #do_describe, #do_import, #do_list, #do_search, #do_tag_add, #do_tag_clear, #do_tag_delete, #do_tag_set, #do_tags, #do_undelete, #do_update, #format_var, #handle_error, #handle_response, #import_to_create, #initialize, #load_display_class, #load_file, #mk_opts, #run, #validate_id, #validate_input, #validate_tags, #validator_exception, #validator_method

Constructor Details

This class inherits a constructor from WavefrontCli::Base

Instance Attribute Details

#fmtObject (readonly)

Returns the value of attribute fmt.



10
11
12
# File 'lib/wavefront-cli/write.rb', line 10

def fmt
  @fmt
end

Instance Method Details

#do_fileObject



35
36
37
38
39
# File 'lib/wavefront-cli/write.rb', line 35

def do_file
  valid_format?(options[:infileformat])
  setup_fmt(options[:infileformat] || 'tmv')
  process_input(options[:'<file>'])
end

#do_pointObject

rubocop:disable Metrics/AbcSize rubocop:disable Metrics/MethodLength



19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
# File 'lib/wavefront-cli/write.rb', line 19

def do_point
  p = { path:  options[:'<metric>'],
        value: options[:'<value>'].to_f,
        tags:  tags_to_hash(options[:tag]) }

  p[:source] = options[:host] if options[:host]
  p[:ts] = parse_time(options[:time]) if options[:time]

  begin
    wf.write(p)
  rescue Wavefront::Exception::InvalidEndpoint
    abort 'could not speak to proxy ' \
          "'#{options[:proxy]}:#{options[:port]}'."
  end
end

#enough_fields?(l) ⇒ Boolean

Make sure we have the right number of columns, according to the format string. We want to take every precaution we can to stop users accidentally polluting their metric namespace with junk.

If the format string says we are expecting point tags, we may have more columns than the length of the format string.

Returns:

  • (Boolean)


205
206
207
208
209
210
211
212
213
214
215
# File 'lib/wavefront-cli/write.rb', line 205

def enough_fields?(l)
  ncols = l.split.length

  if fmt.include?('T')
    return false unless ncols >= fmt.length
  else
    return false unless ncols == fmt.length
  end

  true
end

#extract_path(chunks) ⇒ Object

Find and return the metric path in a chunked line of input. The path can be in the data, or passed as an option, or both. If the latter, then we assume the option is a prefix, and concatenate the value in the data.

param chunks [Array] a chunked line of input from #process_line return [String] the metric path raise TypeError if field does not exist



109
110
111
112
113
114
115
# File 'lib/wavefront-cli/write.rb', line 109

def extract_path(chunks)
  m = chunks[fmt.index('m')]
  return options[:metric] ? [options[:metric], m].join('.') : m
rescue TypeError
  return options[:metric] if options[:metric]
  raise
end

#extract_source(chunks) ⇒ Object

Find and return the source in a chunked line of input.

param chunks [Array] a chunked line of input from #process_line return [String] the source, if it is there, or if not, the

value passed through by -H, or the local hostname.


123
124
125
126
127
# File 'lib/wavefront-cli/write.rb', line 123

def extract_source(chunks)
  return chunks[fmt.index('s')]
rescue TypeError
  options[:source] || Socket.gethostname
end

#extract_tags(chunks) ⇒ Object



96
97
98
# File 'lib/wavefront-cli/write.rb', line 96

def extract_tags(chunks)
  tags_to_hash(chunks.last.split(/\s(?=(?:[^"]|"[^"]*")*$)/))
end

#extract_ts(chunks) ⇒ Object

Find and return the source in a chunked line of input.

param chunks [Array] a chunked line of input from #process_line return [Float] the timestamp, if it is there, or the current

UTC time if it is not.

raise TypeError if field does not exist



89
90
91
92
93
94
# File 'lib/wavefront-cli/write.rb', line 89

def extract_ts(chunks)
  ts = chunks[fmt.index('t')]
  return parse_time(ts) if valid_timestamp?(ts)
rescue TypeError
  Time.now.utc.to_i
end

#extract_value(chunks) ⇒ Object

Find and return the value in a chunked line of input

param chunks [Array] a chunked line of input from #process_line return [Float] the value raise TypeError if field does not exist raise Wavefront::Exception::InvalidValue if it’s not a value



77
78
79
80
# File 'lib/wavefront-cli/write.rb', line 77

def extract_value(chunks)
  v = chunks[fmt.index('v')]
  v.to_f
end

#line_tags(chunks) ⇒ Object

We can get tags from the file, from the -T option, or both. Merge them, making the -T win if there is a collision.



157
158
159
160
161
# File 'lib/wavefront-cli/write.rb', line 157

def line_tags(chunks)
  file_tags = fmt.last == 'T' ? extract_tags(chunks) : {}
  opt_tags = tags_to_hash(options[:tag])
  file_tags.merge(opt_tags)
end

#mk_credsObject



13
14
15
# File 'lib/wavefront-cli/write.rb', line 13

def mk_creds
  { proxy: options[:proxy], port: options[:port] || 2878 }
end

#process_input(file) ⇒ Object

Read the input, from a file or from STDIN, and turn each line into Wavefront points.



44
45
46
47
48
49
50
51
52
53
54
# File 'lib/wavefront-cli/write.rb', line 44

def process_input(file)
  if file == '-'
    read_stdin
  else
    data = load_data(Pathname.new(file)).split("\n").map do |l|
      process_line(l)
    end

    wf.write(data)
  end
end

#process_line(l) ⇒ Object

Process a line of input, as described by the format string held in @fmt. Produces a hash suitable for the SDK to send on.

We let the user define most of the fields, but anything beyond what they define is always assumed to be point tags. This is because you can have arbitrarily many of those for each point.



136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
# File 'lib/wavefront-cli/write.rb', line 136

def process_line(l)
  return true if l.empty?
  chunks = l.split(/\s+/, fmt.length)
  raise 'wrong number of fields' unless enough_fields?(l)

  begin
    point = { path:  extract_path(chunks),
              value: extract_value(chunks) }
    point[:ts] = extract_ts(chunks) if fmt.include?('t')
    point[:source] = extract_source(chunks) if fmt.include?('s')
    point[:tags] = line_tags(chunks)
  rescue TypeError
    raise "could not process #{l}"
  end

  point
end

#read_stdinObject

Read from standard in and stream points through an open socket. If the user hits ctrl-c, close the socket and exit politely.



60
61
62
63
64
65
66
67
68
# File 'lib/wavefront-cli/write.rb', line 60

def read_stdin
  wf.open
  STDIN.each_line { |l| wf.write(process_line(l.strip), false) }
  wf.close
rescue SystemExit, Interrupt
  puts 'ctrl-c. Exiting.'
  wf.close
  exit 0
end

#tags_to_hash(tags) ⇒ Object

Takes an array of key=value tags (as produced by docopt) and turns it into a hash of key: value tags. Anything not of the form key=val is dropped. If key or value are quoted, we remove the quotes.

return Hash

Parameters:

  • tags (Array)


171
172
173
174
175
176
177
178
179
# File 'lib/wavefront-cli/write.rb', line 171

def tags_to_hash(tags)
  return nil unless tags

  [tags].flatten.each_with_object({}) do |t, ret|
    k, v = t.split('=', 2)
    k.gsub!(/^["']|["']$/, '')
    ret[k] = v.to_s.gsub(/^["']|["']$/, '') if v
  end
end

#valid_format?(fmt) ⇒ Boolean

The format string must contain a ‘v’. It must not contain anything other than ‘m’, ‘t’, ‘T’, ‘s’, or ‘v’, and the ‘T’, if there, must be at the end. No letter must appear more than once.

Parameters:

  • fmt (String)

    format of input file

Returns:

  • (Boolean)


188
189
190
191
192
193
194
195
# File 'lib/wavefront-cli/write.rb', line 188

def valid_format?(fmt)
  if fmt.include?('v') && fmt.match(/^[mstv]+T?$/) &&
     fmt == fmt.split('').uniq.join
    return true
  end

  raise 'Invalid format string.'
end

#valid_timestamp?(ts) ⇒ Boolean

Although the SDK does value checking, we’ll add another layer of input checing here. See if the time looks valid. We’ll assume anything before 2000/01/01 or after a year from now is wrong. Arbitrary, but there has to be a cut-off somewhere.

Returns:

  • (Boolean)


222
223
224
225
# File 'lib/wavefront-cli/write.rb', line 222

def valid_timestamp?(ts)
  (ts.is_a?(Integer) || ts.match(/^\d+$/)) &&
    ts.to_i > 946684800 && ts.to_i < (Time.now.to_i + 31557600)
end

#validate_optsObject



227
228
229
230
231
232
233
# File 'lib/wavefront-cli/write.rb', line 227

def validate_opts
  unless options[:metric] || options[:format].include?('m')
    abort "Supply a metric path in the file or with '-m'."
  end

  raise 'Please supply a proxy address.' unless options[:proxy]
end