Class: Wavefront::Write
Overview
This class helps you send points to a Wavefront proxy in native format. Usually this is done on port 2878.
Instance Attribute Summary collapse
-
#sock ⇒ Object
readonly
Returns the value of attribute sock.
-
#summary ⇒ Object
readonly
Returns the value of attribute summary.
Attributes inherited from Base
#conn, #debug, #logger, #net, #noop, #opts, #update_keys, #verbose
Instance Method Summary collapse
-
#close ⇒ Object
Close the socket described by the @sock instance variable.
-
#hash_to_wf(p) ⇒ Object
Convert a validated point to a string conforming to community.wavefront.com/docs/DOC-1031.
-
#open ⇒ Object
Open a socket to a Wavefront proxy, putting the descriptor in instance variable @sock.
-
#paths_to_deltas(points) ⇒ Array[Hash]
Prefix all paths in a points array (as passed to #write_delta() with a delta symbol.
-
#post_initialize(_creds = {}, options = {}) ⇒ Object
Construct an object which allows us to write points to a Wavefront proxy.
-
#raw(points, openclose = true) ⇒ Object
Send raw data to a Wavefront proxy, automatically opening and closing a socket.
-
#send_point(point) ⇒ Object
Send a point which is already in Wavefront wire format.
- #setup_options(user, defaults) ⇒ Object
- #valid_point?(p) ⇒ Boolean
-
#write(points = [], openclose = true) ⇒ Object
Send multiple points to a Wavefront proxy.
-
#write_delta(points, openclose = true) ⇒ Object
A wrapper method around #write() which guarantees all points will be sent as deltas.
Methods inherited from Base
#api_base, #api_delete, #api_get, #api_post, #api_put, #everything, #hash_for_update, #initialize, #log, #mk_conn, #print_message, #respond, #time_to_ms
Methods included from Mixins
#parse_relative_time, #parse_time, #relative_time, #time_multiplier
Methods included from Validators
#wf_alert_id?, #wf_alert_severity?, #wf_cloudintegration_id?, #wf_dashboard_id?, #wf_epoch?, #wf_event_id?, #wf_granularity?, #wf_integration_id?, #wf_link_id?, #wf_link_template?, #wf_maintenance_window_id?, #wf_message_id?, #wf_metric_name?, #wf_ms_ts?, #wf_name?, #wf_notificant_id?, #wf_point?, #wf_point_tag?, #wf_point_tags?, #wf_proxy_id?, #wf_savedsearch_entity?, #wf_savedsearch_id?, #wf_source_id?, #wf_string?, #wf_tag?, #wf_ts?, #wf_user_id?, #wf_value?, #wf_version?, #wf_webhook_id?
Constructor Details
This class inherits a constructor from Wavefront::Base
Instance Attribute Details
#sock ⇒ Object (readonly)
Returns the value of attribute sock.
13 14 15 |
# File 'lib/wavefront-sdk/write.rb', line 13 def sock @sock end |
#summary ⇒ Object (readonly)
Returns the value of attribute summary.
13 14 15 |
# File 'lib/wavefront-sdk/write.rb', line 13 def summary @summary end |
Instance Method Details
#close ⇒ Object
Close the socket described by the @sock instance variable.
224 225 226 227 228 |
# File 'lib/wavefront-sdk/write.rb', line 224 def close return if opts[:noop] log('Closing connection to proxy.', :info) sock.close end |
#hash_to_wf(p) ⇒ Object
Convert a validated point to a string conforming to community.wavefront.com/docs/DOC-1031. No validation is done here.
rubocop:disable Metrics/CyclomaticComplexity
160 161 162 163 164 165 166 167 168 169 170 171 172 173 |
# File 'lib/wavefront-sdk/write.rb', line 160 def hash_to_wf(p) unless p.key?(:path) && p.key?(:value) raise Wavefront::Exception::InvalidPoint end p[:source] = HOSTNAME unless p.key?(:source) m = [p[:path], p[:value]] m.<< p[:ts] if p[:ts] m.<< 'source=' + p[:source] m.<< p[:tags].to_wf_tag if p[:tags] m.<< opts[:tags].to_wf_tag if opts[:tags] m.join(' ') end |
#open ⇒ Object
Open a socket to a Wavefront proxy, putting the descriptor in instance variable @sock.
204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 |
# File 'lib/wavefront-sdk/write.rb', line 204 def open if opts[:noop] log('No-op requested. Not opening connection to proxy.') return true end port = net[:port] || 2878 log("Connecting to #{net[:proxy]}:#{port}.", :info) begin @sock = TCPSocket.new(net[:proxy], port) rescue => e log(e, :error) raise Wavefront::Exception::InvalidEndpoint end end |
#paths_to_deltas(points) ⇒ Array[Hash]
Prefix all paths in a points array (as passed to #write_delta() with a delta symbol
130 131 132 |
# File 'lib/wavefront-sdk/write.rb', line 130 def paths_to_deltas(points) [points].flatten.map { |p| p.tap { p[:path] = DELTA + p[:path] } } end |
#post_initialize(_creds = {}, options = {}) ⇒ Object
Construct an object which allows us to write points to a Wavefront proxy.
33 34 35 36 37 38 39 40 41 42 43 44 |
# File 'lib/wavefront-sdk/write.rb', line 33 def post_initialize(_creds = {}, = {}) defaults = { tags: nil, noop: false, novalidate: false, verbose: false, debug: false } @summary = { sent: 0, rejected: 0, unsent: 0 } @opts = (, defaults) (opts[:tags]) if opts[:tags] end |
#raw(points, openclose = true) ⇒ Object
Send raw data to a Wavefront proxy, automatically opening and closing a socket.
61 62 63 64 65 66 67 68 69 |
# File 'lib/wavefront-sdk/write.rb', line 61 def raw(points, openclose = true) open if openclose begin [points].flatten.each{ |p| send_point(p) } ensure close if openclose end end |
#send_point(point) ⇒ Object
Send a point which is already in Wavefront wire format.
180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 |
# File 'lib/wavefront-sdk/write.rb', line 180 def send_point(point) if opts[:noop] log "Would send: #{point}" return end log("Sending: #{point}", :info) begin sock.puts(point) rescue => e summary[:unsent] += 1 log('WARNING: failed to send point.') log(e.to_s, :debug) return false end summary[:sent] += 1 true end |
#setup_options(user, defaults) ⇒ Object
46 47 48 |
# File 'lib/wavefront-sdk/write.rb', line 46 def (user, defaults) defaults.merge(user) end |
#valid_point?(p) ⇒ Boolean
134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 |
# File 'lib/wavefront-sdk/write.rb', line 134 def valid_point?(p) return true if opts[:novalidate] begin wf_point?(p) return true rescue Wavefront::Exception::InvalidMetricName, Wavefront::Exception::InvalidMetricValue, Wavefront::Exception::InvalidTimestamp, Wavefront::Exception::InvalidSourceId, Wavefront::Exception::InvalidTag => e log('Invalid point, skipping.', :info) log("Invalid point: #{p}. (#{e})", :debug) summary[:rejected] += 1 return false end end |
#write(points = [], openclose = true) ⇒ Object
Send multiple points to a Wavefront proxy.
90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 |
# File 'lib/wavefront-sdk/write.rb', line 90 def write(points = [], openclose = true) open if openclose begin [points].flatten.each do |p| p[:ts] = p[:ts].to_i if p[:ts].is_a?(Time) valid_point?(p) send_point(hash_to_wf(p)) end ensure close if openclose end s_str = summary[:unsent] == 0 && summary[:rejected] == 0 ? 'OK' : 'ERROR' resp = { status: { result: s_str, message: nil, code: nil }, response: summary }.to_json Wavefront::Response.new(resp, nil) end |
#write_delta(points, openclose = true) ⇒ Object
A wrapper method around #write() which guarantees all points will be sent as deltas. You can still manually prefix any metric with a Δ and use #write(), but depending on your use-case, this method may be safer. It’s easy to forget the Δ.
120 121 122 |
# File 'lib/wavefront-sdk/write.rb', line 120 def write_delta(points, openclose = true) write(paths_to_deltas(points), openclose) end |