Class: ThreeScale::Backend::Stats::BucketStorage

Inherits:
Object
  • Object
show all
Defined in:
lib/3scale/backend/stats/bucket_storage.rb

Overview

This class manages the buckets where we are storing stats keys. The way those buckets work is as follows: we are creating a bucket every few seconds (10 by default now), and in each of those buckets, we store all the stats keys that have changed in that bucket creation interval. The values of the keys that are stored in the buckets can be retrieved with a normal call to redis.

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(storage) ⇒ BucketStorage

Returns a new instance of BucketStorage.



26
27
28
# File 'lib/3scale/backend/stats/bucket_storage.rb', line 26

def initialize(storage)
  @storage = storage
end

Instance Attribute Details

#storageObject (readonly)

Returns the value of attribute storage.



24
25
26
# File 'lib/3scale/backend/stats/bucket_storage.rb', line 24

def storage
  @storage
end

Instance Method Details

#buckets(first: '-inf', last: '+inf') ⇒ Object



54
55
56
# File 'lib/3scale/backend/stats/bucket_storage.rb', line 54

def buckets(first: '-inf', last: '+inf')
  storage.zrangebyscore(Keys.changed_keys_key, first, last)
end

#content(buckets) ⇒ Object



73
74
75
76
77
78
79
80
81
82
83
84
85
86
# File 'lib/3scale/backend/stats/bucket_storage.rb', line 73

def content(buckets)
  # Values are stored as strings in Redis, but we want integers.
  # There are some values that can be nil. This happens when the key
  # has a TTL and we read it once it has expired. Right now, event keys
  # with granularity = 'minute' expire after 180 s. We might need to
  # increase that to make sure that we do not miss any values.

  buckets.each_slice(MAX_BUCKETS_REDIS_UNION).inject([]) do |res, buckets_slice|
    bucket_keys = buckets_slice.map do |bucket|
      Keys.changed_keys_bucket_key(bucket)
    end
    (res + storage.sunion(bucket_keys))
  end.uniq
end

#delete_all_buckets_and_keys(options = {}) ⇒ Object



41
42
43
44
45
46
47
48
49
50
51
52
# File 'lib/3scale/backend/stats/bucket_storage.rb', line 41

def delete_all_buckets_and_keys(options = {})
  Storage.disable!

  buckets.each do |bucket|
    keys = storage.smembers(Keys.changed_keys_bucket_key(bucket))
    unless options[:silent]
      puts "Deleting bucket: #{bucket}, containing #{keys.size} keys"
    end
    storage.del(Keys.changed_keys_bucket_key(bucket))
  end
  storage.del(Keys.changed_keys_key)
end

#delete_range(last_bucket) ⇒ Object

For each of the buckets in the range, deletes it from the set, and also deletes its contents.



32
33
34
35
36
37
38
39
# File 'lib/3scale/backend/stats/bucket_storage.rb', line 32

def delete_range(last_bucket)
  buckets = storage.zrangebyscore(Keys.changed_keys_key, 0, last_bucket)

  storage.pipelined do
    buckets.each { |bucket| delete_bucket_content(bucket) }
    storage.zremrangebyscore(Keys.changed_keys_key, 0, last_bucket)
  end
end

#pending_buckets_sizeObject



58
59
60
# File 'lib/3scale/backend/stats/bucket_storage.rb', line 58

def pending_buckets_size
  storage.zcard(Keys.changed_keys_key)
end

#pending_keys_by_bucketObject



88
89
90
91
92
93
94
95
96
97
98
# File 'lib/3scale/backend/stats/bucket_storage.rb', line 88

def pending_keys_by_bucket
  bucket_keys = buckets.map do |bucket|
    Keys.changed_keys_bucket_key(bucket)
  end

  cardinalities = storage.pipelined do
    bucket_keys.map { |bucket_key| storage.scard(bucket_key) }
  end

  Hash[buckets.zip(cardinalities)]
end

#put_in_bucket(event_keys, bucket) ⇒ Object

Puts keys in a bucket. The bucket is created if it does not exist. We could have decided to only fill the bucket if it existed, but that would affect performance, because we would need to get all the existing buckets to check if the given one exists in every call.



66
67
68
69
70
71
# File 'lib/3scale/backend/stats/bucket_storage.rb', line 66

def put_in_bucket(event_keys, bucket)
  storage.pipelined do
    storage.zadd(Keys.changed_keys_key, bucket, bucket)
    storage.sadd(Keys.changed_keys_bucket_key(bucket), event_keys)
  end
end