Class: Spectator::Publisher

Inherits:
Object
  • Object
show all
Defined in:
lib/spectator/registry.rb

Overview

Internal class used to publish measurements to an aggregator service

Constant Summary collapse

ADD_OP =
0
MAX_OP =
10
UNKNOWN_OP =
-1
OPS =
{ count: ADD_OP,
totalAmount: ADD_OP,
totalTime: ADD_OP,
totalOfSquares: ADD_OP,
percentile: ADD_OP,
max: MAX_OP,
gauge: MAX_OP,
activeTasks: MAX_OP,
duration: MAX_OP }.freeze

Instance Method Summary collapse

Constructor Details

#initialize(registry) ⇒ Publisher

Returns a new instance of Publisher.



116
117
118
119
120
121
122
# File 'lib/spectator/registry.rb', line 116

def initialize(registry)
  @registry = registry
  @started = false
  @should_stop = false
  @frequency = registry.config[:frequency] || 5
  @http = Http.new(registry)
end

Instance Method Details

#append_measurement(payload, table, measure) ⇒ Object

Add a measurement to our payload table. The serialization for a measurement is:

- length of tags
- indexes for the tags based on the string table
- operation (add (0), max (10))
- floating point value


232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
# File 'lib/spectator/registry.rb', line 232

def append_measurement(payload, table, measure)
  op = op_for_measurement(measure)
  common_tags = @registry.common_tags
  tags = measure.id.tags
  len = tags.length + 1 + common_tags.length
  payload.push(len)
  common_tags.each do |k, v|
    payload.push(table[k])
    payload.push(table[v])
  end
  tags.each do |k, v|
    payload.push(table[k])
    payload.push(table[v])
  end
  payload.push(table[:name])
  payload.push(table[measure.id.name])
  payload.push(op)
  payload.push(measure.value)
end

#build_string_table(measurements) ⇒ Object

Build a string table from the list of measurements Unique words are identified, and assigned a number starting from 0 based on their lexicographical order



204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
# File 'lib/spectator/registry.rb', line 204

def build_string_table(measurements)
  common_tags = @registry.common_tags
  table = {}
  common_tags.each do |k, v|
    table[k] = 0
    table[v] = 0
  end
  table[:name] = 0
  measurements.each do |m|
    table[m.id.name] = 0
    m.id.tags.each do |k, v|
      table[k] = 0
      table[v] = 0
    end
  end
  keys = table.keys.sort
  keys.each_with_index do |str, index|
    table[str] = index
  end
  table
end

#op_for_measurement(measure) ⇒ Object

Get the operation to be used for the given Measure Gauges are aggregated using MAX_OP, counters with ADD_OP



186
187
188
189
# File 'lib/spectator/registry.rb', line 186

def op_for_measurement(measure)
  stat = measure.id.tags.fetch(:statistic, :unknown)
  OPS.fetch(stat, UNKNOWN_OP)
end

#payload_for_measurements(measurements) ⇒ Object

Generate a payload from the list of measurements The payload is an array, with the number of elements in the string table The string table, and measurements



255
256
257
258
259
260
261
262
263
# File 'lib/spectator/registry.rb', line 255

def payload_for_measurements(measurements)
  table = build_string_table(measurements)
  payload = []
  payload.push(table.length)
  strings = table.keys.sort
  payload.concat(strings)
  measurements.each { |m| append_measurement(payload, table, m) }
  payload
end

#publishObject

Publish loop:

send measurements to the aggregator endpoint ':uri',
every ':frequency' seconds


286
287
288
289
290
291
292
293
294
295
296
# File 'lib/spectator/registry.rb', line 286

def publish
  clock = @registry.clock
  until @should_stop
    start = clock.wall_time
    Spectator.logger.info 'Publishing'
    send_metrics_now
    elapsed = clock.wall_time - start
    sleep @frequency - elapsed if elapsed < @frequency
  end
  Spectator.logger.info 'Stopping publishing thread'
end

#registry_measurementsObject

Get a list of measurements that should be sent



266
267
268
# File 'lib/spectator/registry.rb', line 266

def registry_measurements
  @registry.measurements.select { |m| should_send(m) }
end

#send_metrics_nowObject

Send the current measurements to our aggregator service



271
272
273
274
275
276
277
278
279
280
281
# File 'lib/spectator/registry.rb', line 271

def send_metrics_now
  ms = registry_measurements
  if ms.empty?
    Spectator.logger.debug 'No measurements to send'
  else
    payload = payload_for_measurements(ms)
    uri = @registry.config[:uri]
    Spectator.logger.info "Sending #{ms.length} measurements to #{uri}"
    @http.post_json(uri, payload)
  end
end

#should_send(measure) ⇒ Object

Gauges are sent if they have a value Counters if they have a number of increments greater than 0



193
194
195
196
197
198
199
# File 'lib/spectator/registry.rb', line 193

def should_send(measure)
  op = op_for_measurement(measure)
  return measure.value > 0 if op == ADD_OP
  return !measure.value.nan? if op == MAX_OP

  false
end

#should_start?Boolean

Returns:

  • (Boolean)


124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
# File 'lib/spectator/registry.rb', line 124

def should_start?
  if @started
    Spectator.logger.info('Ignoring start request. ' \
      'Spectator registry already started')
    return false
  end

  @started = true
  uri = @registry.config[:uri]
  if uri.nil? || uri.empty?
    Spectator.logger.info('Ignoring start request since Spectator ' \
                              'registry has no valid uri')
    return false
  end

  true
end

#startObject

Start publishing if the config is acceptable:

uri is non-nil or empty


144
145
146
147
148
149
150
151
152
153
# File 'lib/spectator/registry.rb', line 144

def start
  return unless should_start?

  Spectator.logger.info 'Starting Spectator registry'

  @should_stop = false
  @publish_thread = Thread.new do
    publish
  end
end

#stopObject

Stop publishing measurements



156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
# File 'lib/spectator/registry.rb', line 156

def stop
  unless @started
    Spectator.logger.info('Attemping to stop Spectator ' \
      'without a previous call to start')
    return
  end

  @should_stop = true
  Spectator.logger.info('Stopping spectator')
  @publish_thread.kill if @publish_thread

  @started = false
  Spectator.logger.info('Sending last batch of metrics before exiting')
  send_metrics_now
end