Class: Wavefront::BatchWriter
- Inherits:
-
Object
- Object
- Wavefront::BatchWriter
- Includes:
- Constants, Validators
- Defined in:
- lib/wavefront/batch_writer.rb
Overview
This class exists to facilitate sending of multiple data points to a Wavefront proxy. It sends points in native Wavefront format.
When initializing the instance you can define point tags which will apply to all points sent via that instance.
Though we provide methods to do it, it is the developer’s responsibility to open and close the socket to the proxy. Points are sent by calling the write() method.
The class keeps a count of the points the current instance has sent, dropped, and failed to send, in @summary. The socket is accessed through the instance variable @sock.
Constant Summary
Constants included from Constants
Constants::ALERT_FORMATS, Constants::DEFAULT_ALERT_FORMAT, Constants::DEFAULT_FORMAT, Constants::DEFAULT_HOST, Constants::DEFAULT_INFILE_FORMAT, Constants::DEFAULT_OBSOLETE_METRICS, Constants::DEFAULT_PERIOD_SECONDS, Constants::DEFAULT_PREFIX_LENGTH, Constants::DEFAULT_PROXY, Constants::DEFAULT_PROXY_PORT, Constants::DEFAULT_STRICT, Constants::EVENT_LEVELS, Constants::EVENT_STATE_DIR, Constants::FORMATS, Constants::GRANULARITIES
Instance Attribute Summary collapse
-
#opts ⇒ Object
readonly
Returns the value of attribute opts.
-
#sock ⇒ Object
readonly
Returns the value of attribute sock.
-
#summary ⇒ Object
readonly
Returns the value of attribute summary.
Instance Method Summary collapse
- #close_socket ⇒ Object
- #hash_to_wf(p) ⇒ Object
-
#initialize(options = {}) ⇒ BatchWriter
constructor
A new instance of BatchWriter.
- #open_socket ⇒ Object
- #send_point(point) ⇒ Object
- #setup_options(user, defaults) ⇒ Object
- #tag_hash_to_str(tags) ⇒ Object
- #valid_point?(point) ⇒ Boolean
- #write(points = [], options = {}) ⇒ Object
Methods included from Validators
#valid_path?, #valid_source?, #valid_string?, #valid_tags?, #valid_ts?, #valid_value?
Constructor Details
#initialize(options = {}) ⇒ BatchWriter
Returns a new instance of BatchWriter.
33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 |
# File 'lib/wavefront/batch_writer.rb', line 33 def initialize( = {}) # # options is of the form: # # { # tags: a key-value hash of tags which will be applied to # every point # proxy: the address of the Wavefront proxy # port: the port of the Wavefront proxy # noop: if this is true, no proxy connection will be made, # and instead of sending the points, they will # be printed in Wavefront wire format. # novalidate: if this is true, points will not be validated. # This might make things go marginally quicker # if you have done point validation higher up in # the chain. # verbose: if this is true, many of the methods will report # their progress. # debug: if this is true, debugging output will be # printed. # } # defaults = { tags: false, proxy: DEFAULT_PROXY, port: DEFAULT_PROXY_PORT, noop: false, novalidate: false, verbose: false, debug: false, } @summary = { sent: 0, rejected: 0, unsent: 0, } @opts = (, defaults) if opts[:tags] (opts[:tags]) @global_tags = opts[:tags] end end |
Instance Attribute Details
#opts ⇒ Object (readonly)
Returns the value of attribute opts.
29 30 31 |
# File 'lib/wavefront/batch_writer.rb', line 29 def opts @opts end |
#sock ⇒ Object (readonly)
Returns the value of attribute sock.
29 30 31 |
# File 'lib/wavefront/batch_writer.rb', line 29 def sock @sock end |
#summary ⇒ Object (readonly)
Returns the value of attribute summary.
29 30 31 |
# File 'lib/wavefront/batch_writer.rb', line 29 def summary @summary end |
Instance Method Details
#close_socket ⇒ Object
208 209 210 211 212 |
# File 'lib/wavefront/batch_writer.rb', line 208 def close_socket return if opts[:noop] puts 'Closing connection to proxy.' if opts[:verbose] sock.close end |
#hash_to_wf(p) ⇒ Object
139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 |
# File 'lib/wavefront/batch_writer.rb', line 139 def hash_to_wf(p) # # Convert the hash received by the write() method to a string # conforming with that defined in # https://community.wavefront.com/docs/DOC-1031 # fail ArgumentError unless p.key?(:path) && p.key?(:value) && p.key?(:source) m = [p[:path], p[:value]] m.<< p[:ts].to_i.to_s if p.key?(:ts) && p[:ts] m.<< 'source=' + p[:source] m.<< tag_hash_to_str(p[:tags]) if p.key?(:tags) && p[:tags] m.<< tag_hash_to_str(opts[:tags]) if opts[:tags] m.join(' ') end |
#open_socket ⇒ Object
189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 |
# File 'lib/wavefront/batch_writer.rb', line 189 def open_socket # # Open a socket to a Wavefront proxy, putting the descriptor # in instance variable @sock. # if opts[:noop] puts 'No-op requested. Not opening connection to proxy.' return true end puts "Connecting to #{opts[:proxy]}:#{opts[:port]}." if opts[:verbose] begin @sock = TCPSocket.new(opts[:proxy], opts[:port]) rescue raise Wavefront::Exception::InvalidEndpoint end end |
#send_point(point) ⇒ Object
166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 |
# File 'lib/wavefront/batch_writer.rb', line 166 def send_point(point) # # Send a point, which should already be in Wavefront wire # format. # if opts[:noop] puts "Would send: #{point}" return end puts "Sending: #{point}" if opts[:verbose] || opts[:debug] begin sock.puts(point) summary[:sent] += 1 return true rescue summary[:unsent] += 1 puts 'WARNING: failed to send point.' return false end end |
#setup_options(user, defaults) ⇒ Object
77 78 79 80 81 82 |
# File 'lib/wavefront/batch_writer.rb', line 77 def (user, defaults) # # Fill in some defaults, if the user hasn't supplied them # defaults.merge(user) end |
#tag_hash_to_str(tags) ⇒ Object
156 157 158 159 160 161 162 163 164 |
# File 'lib/wavefront/batch_writer.rb', line 156 def tag_hash_to_str() # # Convert a hash of tags into a string of key="val" tags. The # quoting is recommended in the WF wire-format guide. No tag # validation is done here: we assume you used valid_tags() # return '' unless .is_a?(Hash) .map { |k, v| "#{k}=\"#{v}\"" }.join(' ') end |
#valid_point?(point) ⇒ Boolean
125 126 127 128 129 130 131 132 133 134 135 136 137 |
# File 'lib/wavefront/batch_writer.rb', line 125 def valid_point?(point) # # Validate a point so it conforms to the standard described in # https://community.wavefront.com/docs/DOC-1031 # return true if opts.key?(:novalidate) && opts[:novalidate] valid_path?(point[:path]) valid_value?(point[:value]) valid_ts?(point[:ts]) if point[:ts] valid_source?(point[:source]) (point[:tags]) if point[:tags] && point[:tags].length > 0 true end |
#write(points = [], options = {}) ⇒ Object
84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 |
# File 'lib/wavefront/batch_writer.rb', line 84 def write(points = [], = {}) # # Points are defined as hashes of the following form: # { # path: metrics path. String. Mandatory. # value: value of metric. Numeric. Mandatory. # ts: timestamp as a Time or Date object. default: # Time.now. May be omitted or false. # source: originating source of metric. default: `hostname` # tags: optional hash of key: value point tags # } # # Send multiple points by using an array of the above hashes. # unless points.is_a?(Hash) || points.is_a?(Array) summary[:rejected] += 1 return false end points = [points] if points.is_a?(Hash) points.each do |p| p[:ts] = Time.at(p[:ts]) if p[:ts].is_a?(Integer) begin valid_point?(p) rescue Wavefront::Exception::InvalidMetricName, Wavefront::Exception::InvalidMetricValue, Wavefront::Exception::InvalidTimestamp, Wavefront::Exception::InvalidSource, Wavefront::Exception::InvalidTag => e puts 'Invalid point, skipping.' if opts[:verbose] puts "Invalid point: #{p}. (#{e})" if opts[:debug] summary[:rejected] += 1 next end send_point(hash_to_wf(p)) end summary[:rejected] == 0 ? true : false end |