Class: Metricstore::Client

Inherits:
Object
  • Object
show all
Defined in:
lib/metricstore/client.rb

Constant Summary collapse

CARDINALITY_ESTIMATOR_ERROR_RATE =
0.05

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(opts = {}) ⇒ Client

:kvstore - the underlying key-value store.

:sleep_interval - sleep cycle length in seconds (default: 0.1).
:max_retry_delay_in_seconds - maximum time to wait after an error.
:max_unhandled_errors - maximum retries before handling errors.
                        Set this >= max_healthy_errors.
:max_healthy_errors - maximum retries before healthy? returns false.
                      Set this <= max_unhandled_errors.


15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
# File 'lib/metricstore/client.rb', line 15

def initialize(opts={})
  @ttl_of_hours = 31_556_926 # 1 year

  @kvstore = required(opts, :kvstore)
  @sleep_interval = required(opts, :sleep_interval)
  @max_healthy_errors = required(opts, :max_healthy_errors)
  @max_unhandled_errors = required(opts, :max_unhandled_errors)
  @max_retry_delay_in_seconds = required(opts, :max_retry_delay_in_seconds)
  @max_ttl_of_dimension = {}

  updater_options = {
    :kvstore => @kvstore,
    :sleep_interval => @sleep_interval,
    :max_healthy_errors => @max_healthy_errors,
    :max_unhandled_errors => @max_unhandled_errors,
    :max_retry_delay_in_seconds => @max_retry_delay_in_seconds
  }
  @open = false
  @inserter = Inserter.new(updater_options)
  bucket_count = 1 << HyperLogLog.bits_needed(CARDINALITY_ESTIMATOR_ERROR_RATE)
  @inserter.list_threshold = (2.5 * bucket_count).ceil
  @incrementer = Incrementer.new(updater_options)
  @range_updater = RangeUpdater.new(updater_options)
  @count_incrementer = CountIncrementer.new(updater_options)

  range_updater.handle_update_result = Proc.new do |key, result, ttl|
    if key.start_with?("range:") && !result.nil?
      new_or_grew, amount = result
      if new_or_grew == :new || new_or_grew == :grew
        _, time_block, metric_name, dimensions = key.split(/[\/\?]/)
        unless dimensions.nil?
          dimensions = dimensions.split('&')
          dimensions.size.times do |i|
            dimensions2 = dimensions.clone
            group, dimension_value = dimensions2.delete_at(i).split('=')
            key_suffix = "#{time_block}/#{metric_name}/#{group}?#{dimensions2.join('&')}"
            incrementer.increment("rangesum:/#{key_suffix}", amount, ttl)
            incrementer.increment("rangesumsqr:/#{key_suffix}", amount * amount, ttl)
            range_updater.update_range("rangerange:/#{key_suffix}", amount, ttl)
            if new_or_grew == :new
              count_incrementer.increment("rangecount:/#{key_suffix}", 1, ttl)
            end
          end
        end
      end
    end
  end
end

Instance Attribute Details

#max_ttl_of_dimensionObject

Returns the value of attribute max_ttl_of_dimension.



106
107
108
# File 'lib/metricstore/client.rb', line 106

def max_ttl_of_dimension
  @max_ttl_of_dimension
end

#ttl_of_hoursObject

Returns the value of attribute ttl_of_hours.



105
106
107
# File 'lib/metricstore/client.rb', line 105

def ttl_of_hours
  @ttl_of_hours
end

Instance Method Details

#average(args = {}) ⇒ Object



217
218
219
220
221
222
223
224
225
# File 'lib/metricstore/client.rb', line 217

def average(args={})
  time_block = required(args, :hour)
  metric_name = escape(required(args, :what).to_s)
  dimensions = (args[:where] || {}).sort.map{|k,v| escape(k) << '=' << escape(v)}.join('&')
  count, cas = kvstore.fetch(counter_key(time_block, metric_name, dimensions))
  sum, cas = kvstore.fetch(sum_key(time_block, metric_name, dimensions))
  return nil if count.nil? || sum.nil? || count == 0
  sum.to_f / count
end

#average_range(args = {}) ⇒ Object



272
273
274
275
276
277
278
279
280
281
# File 'lib/metricstore/client.rb', line 272

def average_range(args={})
  group = escape(required(args, :group))
  time_block = required(args, :hour)
  metric_name = escape(required(args, :what).to_s)
  dimensions = (args[:where] || {}).sort.map{|k,v| escape(k) << '=' << escape(v)}.join('&')
  count, cas = kvstore.fetch(group_counter_key(time_block, metric_name, group, dimensions))
  sum, cas = kvstore.fetch(range_sum_key(time_block, metric_name, group, dimensions))
  return nil if count.nil? || sum.nil? || count == 0
  sum.to_f / count
end

#backlogObject



101
102
103
# File 'lib/metricstore/client.rb', line 101

def backlog
  inserter.backlog + incrementer.backlog + range_updater.backlog + count_incrementer.backlog
end

#closeObject



93
94
95
96
97
98
99
# File 'lib/metricstore/client.rb', line 93

def close
  @open = false
  inserter.stop!
  incrementer.stop!
  range_updater.stop!
  count_incrementer.stop!
end

#count(args = {}) ⇒ Object



186
187
188
189
190
191
192
# File 'lib/metricstore/client.rb', line 186

def count(args={})
  time_block = required(args, :hour)
  metric_name = escape(required(args, :what).to_s)
  dimensions = (args[:where] || {}).sort.map{|k,v| escape(k) << '=' << escape(v)}.join('&')
  result, cas = kvstore.fetch(counter_key(time_block, metric_name, dimensions))
  result || 0
end

#count_of_groups(args = {}) ⇒ Object



254
255
256
257
258
259
260
261
# File 'lib/metricstore/client.rb', line 254

def count_of_groups(args={})
  group = escape(required(args, :group))
  time_block = required(args, :hour)
  metric_name = escape(required(args, :what).to_s)
  dimensions = (args[:where] || {}).sort.map{|k,v| escape(k) << '=' << escape(v)}.join('&')
  result, cas = kvstore.fetch(group_counter_key(time_block, metric_name, group, dimensions))
  result || 0
end

#counter(args = {}) ⇒ Object

A write method. :what => a String. Required. :when => a Time. Defaults to “now”. :where => a Hash<String, String> (dimension_name => value). Time complexity of this method grows factorially with the size of the :where hash.



121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
# File 'lib/metricstore/client.rb', line 121

def counter(args={})
  assert_open!
  hour = date_as_hour((args[:when] || Time.now).utc)
  metric = escape(required(args, :what).to_s)
  where = (args[:where] || {}).map{|k,v| [k, v, escape(k) << '=' << escape(v), max_ttl_of_dimension[k]] }
  where.all_combinations do |dimensions|
    key = counter_key(hour, metric, dimensions.sort.map{|k,v,s,ttl| s}.join('&'))
    ttl = (dimensions.map{|k,v,s,ttl| ttl} << ttl_of_hours).compact.min
    count_incrementer.increment(key, 1, ttl)
  end
  where.size.times do |i|
    where2 = where.clone
    list, dimension_value, _ = where2.delete_at(i)
    list = escape(list)
    key_middle = "#{hour}/#{metric}/#{list}?"
    where2.all_combinations do |dimensions|
      key_suffix = "#{key_middle}#{dimensions.sort.map{|k,v,s,ttl| s}.join('&')}"
      ttl = (dimensions.map{|k,v,s,ttl| ttl} << ttl_of_hours).compact.min
      inserter.insert("list:/#{key_suffix}", dimension_value, ttl)
      estimator = HyperLogLog::Builder.new(CARDINALITY_ESTIMATOR_ERROR_RATE, Proc.new do |idx, val|
        range_updater.update_range("hyperloglog:#{idx.to_i}:/#{key_suffix}", val, ttl)
      end)
      estimator.add(dimension_value)
    end
  end
end

#estimated_list_size(args = {}) ⇒ Object



313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
# File 'lib/metricstore/client.rb', line 313

def estimated_list_size(args={})
  time_block = required(args, :hour)
  metric_name = escape(required(args, :what).to_s)
  list_name = escape(required(args, :list).to_s)
  dimensions = (args[:where] || {}).sort.map{|k,v| escape(k) << '=' << escape(v)}.join('&')
  list, cas = kvstore.fetch(list_key(time_block, metric_name, list_name, dimensions))
  if list == 'overflow'
    bucket_count = 1 << HyperLogLog.bits_needed(CARDINALITY_ESTIMATOR_ERROR_RATE)
    buckets = Enumerator.new do |yielder|
      bucket_count.times do |i|
        key = hyperloglog_key(i, time_block, metric_name, list_name, dimensions)
        range, cas = kvstore.fetch(key)
        yielder << (range.nil? ? nil : range[1])
      end
    end
    HyperLogLog.estimate_cardinality(buckets)
  else
    list.size
  end
end

#list(args = {}) ⇒ Object



194
195
196
197
198
199
200
201
202
203
204
205
206
207
# File 'lib/metricstore/client.rb', line 194

def list(args={})
  time_block = required(args, :hour)
  metric_name = escape(required(args, :what).to_s)
  list_name = escape(required(args, :list).to_s)
  dimensions = (args[:where] || {}).sort.map{|k,v| escape(k) << '=' << escape(v)}.join('&')
  result, cas = kvstore.fetch(list_key(time_block, metric_name, list_name, dimensions))
  if result == 'overflow'
    error_message = "Too many #{args[:list]} for #{time_block}, #{args[:what]}"
    error_message << ", where #{args[:where].inspect}" unless dimensions.empty?
    raise(Metricstore::DataLossError, error_message)
  else
    result || []
  end
end

#list_thresholdObject



108
109
110
# File 'lib/metricstore/client.rb', line 108

def list_threshold
  inserter.list_threshold
end

#list_threshold=(threshold) ⇒ Object



112
113
114
# File 'lib/metricstore/client.rb', line 112

def list_threshold=(threshold)
  inserter.list_threshold = threshold
end

#maximum(args = {}) ⇒ Object



227
228
229
230
231
232
233
# File 'lib/metricstore/client.rb', line 227

def maximum(args={})
  time_block = required(args, :hour)
  metric_name = escape(required(args, :what).to_s)
  dimensions = (args[:where] || {}).sort.map{|k,v| escape(k) << '=' << escape(v)}.join('&')
  range, cas = kvstore.fetch(range_key(time_block, metric_name, dimensions))
  range.nil? ? nil : range[1]
end

#maximum_range(args = {}) ⇒ Object



283
284
285
286
287
288
289
290
# File 'lib/metricstore/client.rb', line 283

def maximum_range(args={})
  group = escape(required(args, :group))
  time_block = required(args, :hour)
  metric_name = escape(required(args, :what).to_s)
  dimensions = (args[:where] || {}).sort.map{|k,v| escape(k) << '=' << escape(v)}.join('&')
  range, cas = kvstore.fetch(group_range_key(time_block, metric_name, group, dimensions))
  range.nil? ? nil : range[1]
end

#measure(args = {}) ⇒ Object

A write method. :value => an integer. Required. :what => a String. Required. :when => a Time. Defaults to “now”. :where => a Hash<String, String> (dimension_name => value). Time complexity of this method grows factorially with the size of the :where hash.



154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
# File 'lib/metricstore/client.rb', line 154

def measure(args={})
  assert_open!
  value = required(args, :value).to_i
  hour = date_as_hour((args[:when] || Time.now).utc)
  metric = escape(required(args, :what).to_s)
  where = (args[:where] || {}).map{|k,v| [k, v, escape(k) << '=' << escape(v), max_ttl_of_dimension[k]] }
  where.all_combinations do |dimensions|
    dimensions_string = dimensions.sort.map{|k,v,s,ttl| s}.join('&')
    ttl = (dimensions.map{|k,v,s,ttl| ttl} << ttl_of_hours).compact.min
    suffix = build_key('', hour, metric, dimensions_string)
    count_incrementer.increment("count#{suffix}", 1, ttl)
    incrementer.increment("sum#{suffix}", value, ttl)
    range_updater.update_range("range#{suffix}", value, ttl)
    incrementer.increment("sumsqr#{suffix}", value*value, ttl)
  end
  where.size.times do |i|
    where2 = where.clone
    list, dimension_value, _ = where2.delete_at(i)
    list = escape(list)
    key_middle = "#{hour}/#{metric}/#{list}?"
    where2.all_combinations do |dimensions|
      key_suffix = "#{key_middle}#{dimensions.sort.map{|k,v,s,ttl| s}.join('&')}"
      ttl = (dimensions.map{|k,v,s,ttl| ttl} << ttl_of_hours).compact.min
      inserter.insert("list:/#{key_suffix}", dimension_value, ttl)
      estimator = HyperLogLog::Builder.new(CARDINALITY_ESTIMATOR_ERROR_RATE, Proc.new do |idx, val|
        range_updater.update_range("hyperloglog:#{idx.to_i}:/#{key_suffix}", val, ttl)
      end)
      estimator.add(dimension_value)
    end
  end
end

#minimum(args = {}) ⇒ Object



235
236
237
238
239
240
241
# File 'lib/metricstore/client.rb', line 235

def minimum(args={})
  time_block = required(args, :hour)
  metric_name = escape(required(args, :what).to_s)
  dimensions = (args[:where] || {}).sort.map{|k,v| escape(k) << '=' << escape(v)}.join('&')
  range, cas = kvstore.fetch(range_key(time_block, metric_name, dimensions))
  range.nil? ? nil : range[0]
end

#minimum_range(args = {}) ⇒ Object



292
293
294
295
296
297
298
299
# File 'lib/metricstore/client.rb', line 292

def minimum_range(args={})
  group = escape(required(args, :group))
  time_block = required(args, :hour)
  metric_name = escape(required(args, :what).to_s)
  dimensions = (args[:where] || {}).sort.map{|k,v| escape(k) << '=' << escape(v)}.join('&')
  range, cas = kvstore.fetch(group_range_key(time_block, metric_name, group, dimensions))
  range.nil? ? nil : range[0]
end

#openObject



85
86
87
88
89
90
91
# File 'lib/metricstore/client.rb', line 85

def open
  inserter.start!
  incrementer.start!
  range_updater.start!
  count_incrementer.start!
  @open = true
end

#run(&callback) ⇒ Object

Use of this method is discouraged. Set up your own EventMachine reactor instead. Nevertheless, for a one-off, infrequent connection, this works too.



67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
# File 'lib/metricstore/client.rb', line 67

def run(&callback)
  require 'em-synchrony'
  raise("Already running") if @open
  EM.synchrony do
    open
    callback.call
    timer = EM.add_periodic_timer(0.01) do
      if backlog == 0
        EM.cancel_timer(timer)
        EM.next_tick do
          close
          EM.stop
        end
      end
    end
  end
end

#stddev(args = {}) ⇒ Object



243
244
245
246
247
248
249
250
251
252
# File 'lib/metricstore/client.rb', line 243

def stddev(args={})
  time_block = required(args, :hour)
  metric_name = escape(required(args, :what).to_s)
  dimensions = (args[:where] || {}).sort.map{|k,v| escape(k) << '=' << escape(v)}.join('&')
  count, cas = kvstore.fetch(counter_key(time_block, metric_name, dimensions))
  sum, cas = kvstore.fetch(sum_key(time_block, metric_name, dimensions))
  sumsqr, cas = kvstore.fetch(sumsqr_key(time_block, metric_name, dimensions))
  return nil if count.nil? || sum.nil? || sumsqr.nil? || count == 0
  Math.sqrt(count * sumsqr - sum*sum) / count
end

#stddev_of_ranges(args = {}) ⇒ Object



301
302
303
304
305
306
307
308
309
310
311
# File 'lib/metricstore/client.rb', line 301

def stddev_of_ranges(args={})
  group = escape(required(args, :group))
  time_block = required(args, :hour)
  metric_name = escape(required(args, :what).to_s)
  dimensions = (args[:where] || {}).sort.map{|k,v| escape(k) << '=' << escape(v)}.join('&')
  count, cas = kvstore.fetch(group_counter_key(time_block, metric_name, group, dimensions))
  sum, cas = kvstore.fetch(range_sum_key(time_block, metric_name, group, dimensions))
  sumsqr, cas = kvstore.fetch(range_sumsqr_key(time_block, metric_name, group, dimensions))
  return nil if count.nil? || sum.nil? || sumsqr.nil? || count == 0
  Math.sqrt(count * sumsqr - sum*sum) / count
end

#sum(args = {}) ⇒ Object



209
210
211
212
213
214
215
# File 'lib/metricstore/client.rb', line 209

def sum(args={})
  time_block = required(args, :hour)
  metric_name = escape(required(args, :what).to_s)
  dimensions = (args[:where] || {}).sort.map{|k,v| escape(k) << '=' << escape(v)}.join('&')
  result, cas = kvstore.fetch(sum_key(time_block, metric_name, dimensions))
  result || 0
end

#sum_of_ranges(args = {}) ⇒ Object



263
264
265
266
267
268
269
270
# File 'lib/metricstore/client.rb', line 263

def sum_of_ranges(args={})
  group = escape(required(args, :group))
  time_block = required(args, :hour)
  metric_name = escape(required(args, :what).to_s)
  dimensions = (args[:where] || {}).sort.map{|k,v| escape(k) << '=' << escape(v)}.join('&')
  result, cas = kvstore.fetch(range_sum_key(time_block, metric_name, group, dimensions))
  result || 0
end