Class: Spectator::Publisher
- Inherits:
-
Object
- Object
- Spectator::Publisher
- Defined in:
- lib/spectator/registry.rb
Overview
Internal class used to publish measurements to an aggregator service
Constant Summary collapse
- ADD_OP =
0- MAX_OP =
10- UNKNOWN_OP =
-1
- OPS =
{ count: ADD_OP, totalAmount: ADD_OP, totalTime: ADD_OP, totalOfSquares: ADD_OP, percentile: ADD_OP, max: MAX_OP, gauge: MAX_OP, activeTasks: MAX_OP, duration: MAX_OP }.freeze
Instance Method Summary collapse
-
#append_measurement(payload, table, measure) ⇒ Object
Add a measurement to our payload table.
-
#build_string_table(measurements) ⇒ Object
Build a string table from the list of measurements Unique words are identified, and assigned a number starting from 0 based on their lexicographical order.
-
#initialize(registry) ⇒ Publisher
constructor
A new instance of Publisher.
-
#op_for_measurement(measure) ⇒ Object
Get the operation to be used for the given Measure Gauges are aggregated using MAX_OP, counters with ADD_OP.
-
#payload_for_measurements(measurements) ⇒ Object
Generate a payload from the list of measurements The payload is an array, with the number of elements in the string table The string table, and measurements.
-
#publish ⇒ Object
Publish loop: send measurements to the aggregator endpoint ‘:uri’, every ‘:frequency’ seconds.
-
#registry_measurements ⇒ Object
Get a list of measurements that should be sent.
-
#send_metrics_now ⇒ Object
Send the current measurements to our aggregator service.
-
#should_send(measure) ⇒ Object
Gauges are sent if they have a value Counters if they have a number of increments greater than 0.
- #should_start? ⇒ Boolean
-
#start ⇒ Object
Start publishing if the config is acceptable: uri is non-nil or empty.
-
#stop ⇒ Object
Stop publishing measurements.
Constructor Details
#initialize(registry) ⇒ Publisher
Returns a new instance of Publisher.
116 117 118 119 120 121 122 |
# File 'lib/spectator/registry.rb', line 116 def initialize(registry) @registry = registry @started = false @should_stop = false @frequency = registry.config[:frequency] || 5 @http = Http.new(registry) end |
Instance Method Details
#append_measurement(payload, table, measure) ⇒ Object
Add a measurement to our payload table. The serialization for a measurement is:
- length of tags
- indexes for the tags based on the string table
- operation (add (0), max (10))
- floating point value
232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 |
# File 'lib/spectator/registry.rb', line 232 def append_measurement(payload, table, measure) op = op_for_measurement(measure) = @registry. = measure.id. len = .length + 1 + .length payload.push(len) .each do |k, v| payload.push(table[k]) payload.push(table[v]) end .each do |k, v| payload.push(table[k]) payload.push(table[v]) end payload.push(table[:name]) payload.push(table[measure.id.name]) payload.push(op) payload.push(measure.value) end |
#build_string_table(measurements) ⇒ Object
Build a string table from the list of measurements Unique words are identified, and assigned a number starting from 0 based on their lexicographical order
204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 |
# File 'lib/spectator/registry.rb', line 204 def build_string_table(measurements) = @registry. table = {} .each do |k, v| table[k] = 0 table[v] = 0 end table[:name] = 0 measurements.each do |m| table[m.id.name] = 0 m.id..each do |k, v| table[k] = 0 table[v] = 0 end end keys = table.keys.sort keys.each_with_index do |str, index| table[str] = index end table end |
#op_for_measurement(measure) ⇒ Object
Get the operation to be used for the given Measure Gauges are aggregated using MAX_OP, counters with ADD_OP
186 187 188 189 |
# File 'lib/spectator/registry.rb', line 186 def op_for_measurement(measure) stat = measure.id..fetch(:statistic, :unknown) OPS.fetch(stat, UNKNOWN_OP) end |
#payload_for_measurements(measurements) ⇒ Object
Generate a payload from the list of measurements The payload is an array, with the number of elements in the string table The string table, and measurements
255 256 257 258 259 260 261 262 263 |
# File 'lib/spectator/registry.rb', line 255 def payload_for_measurements(measurements) table = build_string_table(measurements) payload = [] payload.push(table.length) strings = table.keys.sort payload.concat(strings) measurements.each { |m| append_measurement(payload, table, m) } payload end |
#publish ⇒ Object
Publish loop:
send measurements to the aggregator endpoint ':uri',
every ':frequency' seconds
286 287 288 289 290 291 292 293 294 295 296 |
# File 'lib/spectator/registry.rb', line 286 def publish clock = @registry.clock until @should_stop start = clock.wall_time Spectator.logger.info 'Publishing' send_metrics_now elapsed = clock.wall_time - start sleep @frequency - elapsed if elapsed < @frequency end Spectator.logger.info 'Stopping publishing thread' end |
#registry_measurements ⇒ Object
Get a list of measurements that should be sent
266 267 268 |
# File 'lib/spectator/registry.rb', line 266 def registry_measurements @registry.measurements.select { |m| should_send(m) } end |
#send_metrics_now ⇒ Object
Send the current measurements to our aggregator service
271 272 273 274 275 276 277 278 279 280 281 |
# File 'lib/spectator/registry.rb', line 271 def send_metrics_now ms = registry_measurements if ms.empty? Spectator.logger.debug 'No measurements to send' else payload = payload_for_measurements(ms) uri = @registry.config[:uri] Spectator.logger.info "Sending #{ms.length} measurements to #{uri}" @http.post_json(uri, payload) end end |
#should_send(measure) ⇒ Object
Gauges are sent if they have a value Counters if they have a number of increments greater than 0
193 194 195 196 197 198 199 |
# File 'lib/spectator/registry.rb', line 193 def should_send(measure) op = op_for_measurement(measure) return measure.value > 0 if op == ADD_OP return !measure.value.nan? if op == MAX_OP false end |
#should_start? ⇒ Boolean
124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 |
# File 'lib/spectator/registry.rb', line 124 def should_start? if @started Spectator.logger.info('Ignoring start request. ' \ 'Spectator registry already started') return false end @started = true uri = @registry.config[:uri] if uri.nil? || uri.empty? Spectator.logger.info('Ignoring start request since Spectator ' \ 'registry has no valid uri') return false end true end |
#start ⇒ Object
Start publishing if the config is acceptable:
uri is non-nil or empty
144 145 146 147 148 149 150 151 152 153 |
# File 'lib/spectator/registry.rb', line 144 def start return unless should_start? Spectator.logger.info 'Starting Spectator registry' @should_stop = false @publish_thread = Thread.new do publish end end |
#stop ⇒ Object
Stop publishing measurements
156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 |
# File 'lib/spectator/registry.rb', line 156 def stop unless @started Spectator.logger.info('Attemping to stop Spectator ' \ 'without a previous call to start') return end @should_stop = true Spectator.logger.info('Stopping spectator') @publish_thread.kill if @publish_thread @started = false Spectator.logger.info('Sending last batch of metrics before exiting') send_metrics_now end |