Class: WavefrontCli::Write

Inherits:
Base
  • Object
show all
Includes:
Wavefront::Mixins
Defined in:
lib/wavefront-cli/write.rb

Overview

Send points to a proxy.

Instance Attribute Summary collapse

Attributes inherited from Base

#klass, #klass_word, #options, #wf

Instance Method Summary collapse

Methods inherited from Base

#check_status, #dispatch, #display, #do_delete, #do_describe, #do_import, #do_list, #do_tag_add, #do_tag_clear, #do_tag_delete, #do_tag_set, #do_tags, #do_undelete, #do_update, #format_var, #handle_error, #handle_response, #import_to_create, #initialize, #load_display_class, #load_file, #mk_opts, #run, #validate_id, #validate_input, #validate_tags, #validator_exception, #validator_method

Constructor Details

This class inherits a constructor from WavefrontCli::Base

Instance Attribute Details

#fmtObject (readonly)

Returns the value of attribute fmt.



9
10
11
# File 'lib/wavefront-cli/write.rb', line 9

def fmt
  @fmt
end

Instance Method Details

#do_fileObject



32
33
34
35
36
# File 'lib/wavefront-cli/write.rb', line 32

def do_file
  valid_format?(options[:infileformat])
  setup_fmt(options[:infileformat] || 'tmv')
  process_input(options[:'<file>'])
end

#do_pointObject



16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
# File 'lib/wavefront-cli/write.rb', line 16

def do_point
  p = { path:  options[:'<metric>'],
        value: options[:'<value>'].to_f,
        tags:  tags_to_hash(options[:tag]) }

  p[:source] = options[:host] if options[:host]
  p[:ts] = parse_time(options[:time]) if options[:time]

  begin
    wf.write(p)
  rescue Wavefront::Exception::InvalidEndpoint
    abort 'could not speak to proxy ' \
          "'#{options[:proxy]}:#{options[:port]}'."
  end
end

#enough_fields?(l) ⇒ Boolean

Make sure we have the right number of columns, according to the format string. We want to take every precaution we can to stop users accidentally polluting their metric namespace with junk.

If the format string says we are expecting point tags, we may have more columns than the length of the format string.

Returns:

  • (Boolean)


202
203
204
205
206
207
208
209
210
211
212
# File 'lib/wavefront-cli/write.rb', line 202

def enough_fields?(l)
  ncols = l.split.length

  if fmt.include?('T')
    return false unless ncols >= fmt.length
  else
    return false unless ncols == fmt.length
  end

  true
end

#extract_path(chunks) ⇒ Object

Find and return the metric path in a chunked line of input. The path can be in the data, or passed as an option, or both. If the latter, then we assume the option is a prefix, and concatenate the value in the data.

param chunks [Array] a chunked line of input from #process_line return [String] the metric path raise TypeError if field does not exist



106
107
108
109
110
111
112
# File 'lib/wavefront-cli/write.rb', line 106

def extract_path(chunks)
  m = chunks[fmt.index('m')]
  return options[:metric] ? [options[:metric], m].join('.') : m
rescue TypeError
  return options[:metric] if options[:metric]
  raise
end

#extract_source(chunks) ⇒ Object

Find and return the source in a chunked line of input.

param chunks [Array] a chunked line of input from #process_line return [String] the source, if it is there, or if not, the

value passed through by -H, or the local hostname.


120
121
122
123
124
# File 'lib/wavefront-cli/write.rb', line 120

def extract_source(chunks)
  return chunks[fmt.index('s')]
rescue TypeError
  options[:source] || Socket.gethostname
end

#extract_tags(chunks) ⇒ Object



93
94
95
# File 'lib/wavefront-cli/write.rb', line 93

def extract_tags(chunks)
  tags_to_hash(chunks.last.split(/\s(?=(?:[^"]|"[^"]*")*$)/))
end

#extract_ts(chunks) ⇒ Object

Find and return the source in a chunked line of input.

param chunks [Array] a chunked line of input from #process_line return [Float] the timestamp, if it is there, or the current

UTC time if it is not.

raise TypeError if field does not exist



86
87
88
89
90
91
# File 'lib/wavefront-cli/write.rb', line 86

def extract_ts(chunks)
  ts = chunks[fmt.index('t')]
  return parse_time(ts) if valid_timestamp?(ts)
rescue TypeError
  Time.now.utc.to_i
end

#extract_value(chunks) ⇒ Object

Find and return the value in a chunked line of input

param chunks [Array] a chunked line of input from #process_line return [Float] the value raise TypeError if field does not exist raise Wavefront::Exception::InvalidValue if it’s not a value



74
75
76
77
# File 'lib/wavefront-cli/write.rb', line 74

def extract_value(chunks)
  v = chunks[fmt.index('v')]
  v.to_f
end

#line_tags(chunks) ⇒ Object

We can get tags from the file, from the -T option, or both. Merge them, making the -T win if there is a collision.



154
155
156
157
158
# File 'lib/wavefront-cli/write.rb', line 154

def line_tags(chunks)
  file_tags =  fmt.last == 'T' ? extract_tags(chunks) : {}
  opt_tags = tags_to_hash(options[:tag])
  file_tags.merge(opt_tags)
end

#mk_credsObject



12
13
14
# File 'lib/wavefront-cli/write.rb', line 12

def mk_creds
  { proxy: options[:proxy], port: options[:port] || 2878 }
end

#process_input(file) ⇒ Object

Read the input, from a file or from STDIN, and turn each line into Wavefront points.



41
42
43
44
45
46
47
48
49
50
51
# File 'lib/wavefront-cli/write.rb', line 41

def process_input(file)
  if file == '-'
    read_stdin
  else
    data = load_data(Pathname.new(file)).split("\n").map do |l|
      process_line(l)
    end

    wf.write(data)
  end
end

#process_line(l) ⇒ Object

Process a line of input, as described by the format string held in @fmt. Produces a hash suitable for the SDK to send on.

We let the user define most of the fields, but anything beyond what they define is always assumed to be point tags. This is because you can have arbitrarily many of those for each point.



133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
# File 'lib/wavefront-cli/write.rb', line 133

def process_line(l)
  return true if l.empty?
  chunks = l.split(/\s+/, fmt.length)
  raise 'wrong number of fields' unless enough_fields?(l)

  begin
    point = { path:  extract_path(chunks),
              value: extract_value(chunks) }
    point[:ts] = extract_ts(chunks) if fmt.include?('t')
    point[:source] = extract_source(chunks) if fmt.include?('s')
    point[:tags] = line_tags(chunks)
  rescue TypeError
    raise "could not process #{l}"
  end

  point
end

#read_stdinObject

Read from standard in and stream points through an open socket. If the user hits ctrl-c, close the socket and exit politely.



57
58
59
60
61
62
63
64
65
# File 'lib/wavefront-cli/write.rb', line 57

def read_stdin
  wf.open
  STDIN.each_line { |l| wf.write(process_line(l.strip), false) }
  wf.close
rescue SystemExit, Interrupt
  puts 'ctrl-c. Exiting.'
  wf.close
  exit 0
end

#tags_to_hash(tags) ⇒ Object

Takes an array of key=value tags (as produced by docopt) and turns it into a hash of key: value tags. Anything not of the form key=val is dropped. If key or value are quoted, we remove the quotes.

return Hash

Parameters:

  • tags (Array)


168
169
170
171
172
173
174
175
176
# File 'lib/wavefront-cli/write.rb', line 168

def tags_to_hash(tags)
  return nil unless tags

  [tags].flatten.each_with_object({}) do |t, ret|
    k, v = t.split('=', 2)
    k.gsub!(/^["']|["']$/, '')
    ret[k] = v.to_s.gsub(/^["']|["']$/, '') if v
  end
end

#valid_format?(fmt) ⇒ Boolean

The format string must contain a ‘v’. It must not contain anything other than ‘m’, ‘t’, ‘T’, ‘s’, or ‘v’, and the ‘T’, if there, must be at the end. No letter must appear more than once.

Parameters:

  • fmt (String)

    format of input file

Returns:

  • (Boolean)


185
186
187
188
189
190
191
192
# File 'lib/wavefront-cli/write.rb', line 185

def valid_format?(fmt)
  if fmt.include?('v') && fmt.match(/^[mstv]+T?$/) &&
     fmt == fmt.split('').uniq.join
    return true
  end

  raise 'Invalid format string.'
end

#valid_timestamp?(ts) ⇒ Boolean

Although the SDK does value checking, we’ll add another layer of input checing here. See if the time looks valid. We’ll assume anything before 2000/01/01 or after a year from now is wrong. Arbitrary, but there has to be a cut-off somewhere.

Returns:

  • (Boolean)


219
220
221
222
# File 'lib/wavefront-cli/write.rb', line 219

def valid_timestamp?(ts)
  (ts.is_a?(Integer) || ts.match(/^\d+$/)) &&
    ts.to_i > 946684800 && ts.to_i < (Time.now.to_i + 31557600)
end

#validate_optsObject



224
225
226
227
228
229
230
# File 'lib/wavefront-cli/write.rb', line 224

def validate_opts
  unless options[:metric] || options[:format].include?('m')
    abort "Supply a metric path in the file or with '-m'."
  end

  raise 'Please supply a proxy address.' unless options[:proxy]
end