Class: HeavyKeeper::TopK

Inherits:
Object
  • Object
show all
Defined in:
lib/heavy_keeper/top_k.rb

Overview

rubocop:disable Metrics/ClassLength

Constant Summary collapse

Validator =
::Dry::Schema.Params do
  required(:top_k).filled(:integer, gt?: 0)
  required(:width).filled(:integer, gt?: 0)
  required(:depth).filled(:integer, gt?: 0)
  required(:decay).filled(:decimal, gt?: 0, lteq?: 1)
end

Instance Method Summary collapse

Constructor Details

#initialize(storage: HeavyKeeper.config.storage) ⇒ HeavyKeeper::TopK

Initiate the controller to create/operate on top-k DS

Parameters:

  • storage (Redis) (defaults to: HeavyKeeper.config.storage)

    A Redis client to interact with Redis



22
23
24
25
26
# File 'lib/heavy_keeper/top_k.rb', line 22

def initialize(storage: HeavyKeeper.config.storage)
  @storage = storage
  @min_heap = MinHeap.new(storage)
  @bucket = Bucket.new(storage)
end

Instance Method Details

#add(key, *items) ⇒ Array[Nil, Integer]

Complexity O(k + depth) Add an array of items to a Top-K DS

Parameters:

  • key (String)

    key for identifying top-k DS in Redis

  • items (String, String, ...)

    each value represents an item we want to store in Top-K

Returns:

  • (Array[Nil, Integer])

    nil if the item is not addded to the list otherwise, return the current value of item



59
60
61
62
# File 'lib/heavy_keeper/top_k.rb', line 59

def add(key, *items)
  items_and_increments = items.map { |item| [item, 1] }
  increase_by(key, *items_and_increments)
end

#clear(key) ⇒ Object

Complexity O(1) Clean up all Redis data related to a key

Parameters:

  • key (String)

    a key for identifying top-k DS in Redis

Returns:

  • OK if successful; otherwise, raise error



175
176
177
178
179
180
181
# File 'lib/heavy_keeper/top_k.rb', line 175

def clear(key)
  storage.multi do
    storage.del((key))
    min_heap.clear(key)
    bucket.clear(key)
  end
end

#count(key, *items) ⇒ Array[Integer]

Complexity O(k + depth) Please note this number will never be higher than the real count and likely to be lower. Multiple items can be queried at once.

Parameters:

  • key (String)

    a key for identifying top-k DS in Redis

  • items (String, String, ...)

    an array of item that we want to check

Returns:

  • (Array[Integer])

    return the count of each item



151
152
153
154
155
# File 'lib/heavy_keeper/top_k.rb', line 151

def count(key, *items)
  items.map do |item|
    min_heap.count(key, item)
  end
end

#increase_by(key, *items_and_increments) ⇒ Array[Nil, String]

Complexity O(k + (increment * depth)) Add an array of items to a Top-K DS, with custom increment for each item

rubocop:disable Metrics/AbcSize rubocop:disable Metrics/MethodLength rubocop:disable Metrics/BlockLength rubocop:disable Metrics/PerceivedComplexity

Parameters:

  • key (String)

    key for identifying top-k DS in Redis

  • items_and_increments ([String, Integer], ...)

    each value represents an item and increment that needs to be added to Top-K

Returns:

  • (Array[Nil, String])

    nil if the item is not addded to the list otherwise, return the current value of item



79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
# File 'lib/heavy_keeper/top_k.rb', line 79

def increase_by(key, *items_and_increments)
  options = validate(storage.hgetall((key)))

  items_and_increments.map do |(item, increment)|
    max_count = 0
    item_fingerprint = XXhash.xxh64(item)

    exist = min_heap.exist?(key, item)
    min_value = min_heap.min(key)

    options[:depth].times do |i|
      bucket_number = XXhash.xxh64_stream(StringIO.new(item), i) % options[:width]

      fingerprint, count = bucket.get(key, i, bucket_number)

      if count.nil? || count.zero?
        bucket.set(key, i, bucket_number, [item_fingerprint, increment])
        max_count = [increment, max_count].max
      elsif fingerprint == item_fingerprint
        if exist || count <= min_value
          bucket.set(key, i, bucket_number, [fingerprint, count + increment])
          max_count = [count + increment, max_count].max
        end
      else
        decay = options[:decay]**count

        if SecureRandom.rand < decay
          count -= increment

          if count.positive?
            bucket.set(key, i, bucket_number, [fingerprint, count])
          else
            bucket.set(key, i, bucket_number, [item_fingerprint, increment])
            max_count = [increment, max_count].max
          end
        end
      end
    end

    if exist
      min_heap.update(key, item, max_count)
    else
      min_heap.add(key, item, max_count, options[:top_k])
    end
  end
end

#list(key) ⇒ Hash

Complexity O(k) Return full list of items in Top K list.

Parameters:

  • key (String)

    a key for identifying top-k DS in Redis

Returns:

  • (Hash)

    return a hash contains the key and the count of the top-K elements



164
165
166
167
# File 'lib/heavy_keeper/top_k.rb', line 164

def list(key)
  top_k = storage.hget((key), :top_k).to_i
  min_heap.list(key, top_k)
end

#query(key, *items) ⇒ Array[Boolean]

Complexity O(k) Checks whether an item is one of Top-K items. Multiple items can be checked at once.

Parameters:

  • key (String)

    a key for identifying top-k DS in Redis

  • items (String, String, ...)

    an array of item that we want to check

Returns:

  • (Array[Boolean])

    true if item is in Top-K, otherwise return false



137
138
139
140
141
# File 'lib/heavy_keeper/top_k.rb', line 137

def query(key, *items)
  items.map do |item|
    min_heap.exist?(key, item)
  end
end

#remove(key, item) ⇒ Object

Complexity O(1) Reset counter of an item to zero in order to decay it out

Parameters:

  • key (String)

    a key for identifying top-k DS in Redis

  • items (String)

    item that we want to decay

Returns:

  • OK if successful, raise error otherwise



190
191
192
193
194
195
196
197
198
199
200
201
202
# File 'lib/heavy_keeper/top_k.rb', line 190

def remove(key, item)
  options = validate(storage.hgetall((key)))
  item_fingerprint = XXhash.xxh64(item)

  options[:depth].times do |i|
    bucket_number = XXhash.xxh64_stream(StringIO.new(item), i) % options[:width]
    fingerprint, _ = bucket.get(key, i, bucket_number)

    bucket.set(key, i, bucket_number, [fingerprint, 0]) if item_fingerprint == fingerprint
  end

  min_heap.delete(key, item)
end

#reserve(key, options) ⇒ Object

Complexity O(1) Initialize a TopK in Redis with specified parameters.

Parameters:

  • key (String)

    a key for identifying top-k DS in Redis

  • top_k (Integer)

    number of top elements we want to track

  • width (Integer)

    Size of the bucket to store counter

  • depth (Integer)

    Number of buckets we want to store

  • decay (Decimal)

    decay factor: smaller number means bigger distinction between mouse-flow and elelphant flow

Returns:

  • OK on success, otherwise raise error



39
40
41
42
43
44
45
46
47
# File 'lib/heavy_keeper/top_k.rb', line 39

def reserve(key, options)
  options = validate(options)
  # We need to use Decimal when validating to accept integer
  # number. However, we need to convert to float when save to Redis
  # because redis-rb 5 don't support Decimal type
  options[:decay] = options[:decay].to_f

  storage.mapped_hmset((key), options)
end