Class: Wavefront::Write

Inherits:
Base
  • Object
show all
Defined in:
lib/wavefront-sdk/write.rb

Overview

This class helps you send points to a Wavefront proxy in native format. Usually this is done on port 2878.

rubocop:disable Metrics/ClassLength

Instance Attribute Summary collapse

Attributes inherited from Base

#conn, #debug, #logger, #net, #noop, #opts, #update_keys, #verbose

Instance Method Summary collapse

Methods inherited from Base

#api_base, #api_delete, #api_get, #api_post, #api_put, #everything, #hash_for_update, #initialize, #log, #mk_conn, #print_message, #respond, #time_to_ms

Methods included from Mixins

#parse_relative_time, #parse_time, #relative_time, #time_multiplier

Methods included from Validators

#wf_alert_id?, #wf_alert_severity?, #wf_cloudintegration_id?, #wf_dashboard_id?, #wf_epoch?, #wf_event_id?, #wf_granularity?, #wf_integration_id?, #wf_link_id?, #wf_link_template?, #wf_maintenance_window_id?, #wf_message_id?, #wf_metric_name?, #wf_ms_ts?, #wf_name?, #wf_notificant_id?, #wf_point?, #wf_point_tag?, #wf_point_tags?, #wf_proxy_id?, #wf_savedsearch_entity?, #wf_savedsearch_id?, #wf_source_id?, #wf_string?, #wf_tag?, #wf_ts?, #wf_user_id?, #wf_value?, #wf_version?, #wf_webhook_id?

Constructor Details

This class inherits a constructor from Wavefront::Base

Instance Attribute Details

#sockObject (readonly)

Returns the value of attribute sock.



14
15
16
# File 'lib/wavefront-sdk/write.rb', line 14

def sock
  @sock
end

#summaryObject (readonly)

Returns the value of attribute summary.



14
15
16
# File 'lib/wavefront-sdk/write.rb', line 14

def summary
  @summary
end

Instance Method Details

#closeObject

Close the socket described by the @sock instance variable.



241
242
243
244
245
# File 'lib/wavefront-sdk/write.rb', line 241

def close
  return if opts[:noop]
  log('Closing connection to proxy.', :info)
  sock.close
end

#hash_to_wf(point) ⇒ Object

Convert a validated point to a string conforming to community.wavefront.com/docs/DOC-1031. No validation is done here.

rubocop:disable Metrics/AbcSize rubocop:disable Metrics/CyclomaticComplexity

Parameters:

  • point (Hash)

    a hash describing a point. See #write() for the format.



177
178
179
180
181
182
183
184
185
186
187
188
189
190
# File 'lib/wavefront-sdk/write.rb', line 177

def hash_to_wf(point)
  unless point.key?(:path) && point.key?(:value)
    raise Wavefront::Exception::InvalidPoint
  end

  p[:source] = HOSTNAME unless point.key?(:source)

  m = [point[:path], point[:value]]
  m.<< point[:ts] if point[:ts]
  m.<< 'source=' + point[:source]
  m.<< point[:tags].to_wf_tag if point[:tags]
  m.<< opts[:tags].to_wf_tag if opts[:tags]
  m.join(' ')
end

#openObject

Open a socket to a Wavefront proxy, putting the descriptor in instance variable @sock.



221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
# File 'lib/wavefront-sdk/write.rb', line 221

def open
  if opts[:noop]
    log('No-op requested. Not opening connection to proxy.')
    return true
  end

  port = net[:port] || 2878

  log("Connecting to #{net[:proxy]}:#{port}.", :info)

  begin
    @sock = TCPSocket.new(net[:proxy], port)
  rescue StandardError => e
    log(e, :error)
    raise Wavefront::Exception::InvalidEndpoint
  end
end

#paths_to_deltas(points) ⇒ Array[Hash]

Prefix all paths in a points array (as passed to #write_delta() with a delta symbol

Parameters:

Returns:



145
146
147
# File 'lib/wavefront-sdk/write.rb', line 145

def paths_to_deltas(points)
  [points].flatten.map { |p| p.tap { p[:path] = DELTA + p[:path] } }
end

#post_initialize(_creds = {}, options = {}) ⇒ Object

Construct an object which allows us to write points to a Wavefront proxy.

Parameters:

  • _creds (Hash) (defaults to: {})

    dummy parameter for correct method signature.

  • options (Hash) (defaults to: {})

    can contain the following keys: proxy [String] the address of the Wavefront proxy. (‘wavefront’) port [Integer] the port of the Wavefront proxy (2878) tags [Hash] point tags which will be applied to every point noop [Bool] if true, no proxy connection will be made, and

    instead of sending the points, they will be printed in
    Wavefront wire format.
    

    novalidate [Bool] if true, points will not be validated.

    This might make things go marginally quicker if you have
    done point validation higher up in the chain.
    

    verbose [Bool] debug [Bool]



34
35
36
37
38
39
40
41
42
43
44
45
# File 'lib/wavefront-sdk/write.rb', line 34

def post_initialize(_creds = {}, options = {})
  defaults = { tags:       nil,
               noop:       false,
               novalidate: false,
               verbose:    false,
               debug:      false }

  @summary = { sent: 0, rejected: 0, unsent: 0 }
  @opts = setup_options(options, defaults)

  wf_point_tags?(opts[:tags]) if opts[:tags]
end

#prepped_points(points, prefix = nil) ⇒ Array

Returns of points.

Returns:



116
117
118
119
120
121
122
123
124
# File 'lib/wavefront-sdk/write.rb', line 116

def prepped_points(points, prefix = nil)
  ret = [points].flatten

  if prefix
    ret.map! { |pt| pt.tap { |p| p[:path] = prefix + '.' + p[:path] } }
  end

  ret
end

#raw(points, openclose = true) ⇒ Object

Send raw data to a Wavefront proxy, automatically opening and closing a socket.

Parameters:

  • points (Array[String])

    an array of points in native Wavefront wire format, as described in community.wavefront.com/docs/DOC-1031. No validation is performed.

  • openclose (Boolean) (defaults to: true)

    whether or not to automatically open a socket to the proxy before sending points, and afterwards, close it.



62
63
64
65
66
67
68
69
70
# File 'lib/wavefront-sdk/write.rb', line 62

def raw(points, openclose = true)
  open if openclose

  begin
    [points].flatten.each { |p| send_point(p) }
  ensure
    close if openclose
  end
end

#send_point(point) ⇒ Object

Send a point which is already in Wavefront wire format.

Parameters:

  • point (String)

    a point description, probably from #hash_to_wf()



197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
# File 'lib/wavefront-sdk/write.rb', line 197

def send_point(point)
  if opts[:noop]
    log "Would send: #{point}"
    return
  end

  log("Sending: #{point}", :info)

  begin
    sock.puts(point)
  rescue StandardError => e
    summary[:unsent] += 1
    log('WARNING: failed to send point.')
    log(e.to_s, :debug)
    return false
  end

  summary[:sent] += 1
  true
end

#setup_options(user, defaults) ⇒ Object



47
48
49
# File 'lib/wavefront-sdk/write.rb', line 47

def setup_options(user, defaults)
  defaults.merge(user)
end

#summary_string(summary) ⇒ Object



110
111
112
# File 'lib/wavefront-sdk/write.rb', line 110

def summary_string(summary)
  summary[:unsent].zero? && summary[:rejected].zero? ? 'OK' : 'ERROR'
end

#valid_point?(point) ⇒ Boolean

rubocop:disable Metrics/MethodLength

Returns:

  • (Boolean)


150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
# File 'lib/wavefront-sdk/write.rb', line 150

def valid_point?(point)
  return true if opts[:novalidate]

  begin
    wf_point?(point)
    return true
  rescue Wavefront::Exception::InvalidMetricName,
         Wavefront::Exception::InvalidMetricValue,
         Wavefront::Exception::InvalidTimestamp,
         Wavefront::Exception::InvalidSourceId,
         Wavefront::Exception::InvalidTag => e
    log('Invalid point, skipping.', :info)
    log("Invalid point: #{point}. (#{e})", :debug)
    summary[:rejected] += 1
    return false
  end
end

#write(points = [], openclose = true, prefix = nil) ⇒ Object

Send multiple points to a Wavefront proxy.

Parameters:

  • points (Array[Hash]) (defaults to: [])

    an array of points. Each point is defined as a hash with the following keys:

    path [String] metric path. (mandatory)
    value [Numeric] value of metric. Numeric. Mandatory.
    ts [Time, Integer] timestamp for point. Defaults to
      current UTC time.
    source [String] originating source of metric. Defaults to
      the local hostname.
    tags [Hash] key: value point tags which will be applied in
      addition to any tags defined in the #initialize()
      method.
    
  • openclose (Bool) (defaults to: true)

    if this is false, you must have already opened a socket to the proxy. If it is true, a connection will be opened for you, used, and closed.

  • prefix (String) (defaults to: nil)

    prefix all metrics with this string. No trailing dot is required.

Returns:

  • true if no points are rejected, otherwise false

Raises:

  • any unhandled point validation error is passed through



93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
# File 'lib/wavefront-sdk/write.rb', line 93

def write(points = [], openclose = true, prefix = nil)
  open if openclose

  begin
    _write_loop(prepped_points(points, prefix))
  ensure
    close if openclose
  end

  s_str = summary_string(summary)

  resp = { status:   { result: s_str, message: nil, code: nil },
           response: summary }.to_json

  Wavefront::Response.new(resp, nil)
end

#write_delta(points, openclose = true) ⇒ Object

A wrapper method around #write() which guarantees all points will be sent as deltas. You can still manually prefix any metric with a delta symbol and use #write(), but depending on your use-case, this method may be safer. It’s easy to forget the delta.

Parameters:

  • points (Array[Hash])

    see #write()

  • openclose (Bool) (defaults to: true)

    see #write()



135
136
137
# File 'lib/wavefront-sdk/write.rb', line 135

def write_delta(points, openclose = true)
  write(paths_to_deltas(points), openclose)
end