Class: Gitlab::Counters::BufferedCounter

Inherits:
Object
  • Object
show all
Includes:
ExclusiveLeaseHelpers
Defined in:
lib/gitlab/counters/buffered_counter.rb

Constant Summary collapse

WORKER_DELAY =
10.minutes
WORKER_LOCK_TTL =
10.minutes
REFRESH_KEYS_TTL =

Refresh keys are set to expire after a very long time, so that they do not occupy Redis memory indefinitely, if for any reason they are not deleted. In practice, a refresh is not expected to take longer than this TTL.

14.days
CLEANUP_BATCH_SIZE =
50
CLEANUP_INTERVAL_SECONDS =
0.1
MAX_BITMAP_OFFSET =

Limit size of bitmap key to 2^26-1 (~8MB)

67108863
LUA_FLUSH_INCREMENT_SCRIPT =
<<~LUA
  local increment_key, flushed_key = KEYS[1], KEYS[2]
  local increment_value = redis.call("get", increment_key) or 0
  local flushed_value = redis.call("incrby", flushed_key, increment_value)
  if flushed_value == 0 then
    redis.call("del", increment_key, flushed_key)
  else
    redis.call("del", increment_key)
  end
  return flushed_value
LUA
LUA_INCREMENT_WITH_DEDUPLICATION_SCRIPT =
<<~LUA
  local counter_key, refresh_key, refresh_indicator_key = KEYS[1], KEYS[2], KEYS[3]
  local tracking_shard_key, opposing_tracking_shard_key, shards_key = KEYS[4], KEYS[5], KEYS[6]

  local amount, tracking_offset = tonumber(ARGV[1]), tonumber(ARGV[2])

  -- increment to the counter key when not refreshing
  if redis.call("exists", refresh_indicator_key) == 0 then
    return redis.call("incrby", counter_key, amount)
  end

  -- deduplicate and increment to the refresh counter key while refreshing
  local found_duplicate = redis.call("getbit", tracking_shard_key, tracking_offset)
  if found_duplicate == 1 then
    return redis.call("get", refresh_key)
  end

  redis.call("setbit", tracking_shard_key, tracking_offset, 1)
  redis.call("expire", tracking_shard_key, #{REFRESH_KEYS_TTL.seconds})
  redis.call("sadd", shards_key, tracking_shard_key)
  redis.call("expire", shards_key, #{REFRESH_KEYS_TTL.seconds})

  local found_opposing_change = redis.call("getbit", opposing_tracking_shard_key, tracking_offset)
  local increment_without_previous_decrement = amount > 0 and found_opposing_change == 0
  local decrement_with_previous_increment = amount < 0 and found_opposing_change == 1
  local net_change = 0

  if increment_without_previous_decrement or decrement_with_previous_increment then
    net_change = amount
  end

  return redis.call("incrby", refresh_key, net_change)
LUA
LUA_INITIATE_REFRESH_SCRIPT =
<<~LUA
  local counter_key, refresh_indicator_key = KEYS[1], KEYS[2]
  redis.call("del", counter_key)
  redis.call("set", refresh_indicator_key, 1, "ex", #{REFRESH_KEYS_TTL.seconds})
LUA
LUA_FINALIZE_REFRESH_SCRIPT =
<<~LUA
  local counter_key, refresh_key, refresh_indicator_key = KEYS[1], KEYS[2], KEYS[3]
  local refresh_amount = redis.call("get", refresh_key) or 0

  redis.call("incrby", counter_key, refresh_amount)
  redis.call("del", refresh_indicator_key, refresh_key)
LUA

Constants included from ExclusiveLeaseHelpers

ExclusiveLeaseHelpers::FailedToObtainLockError

Instance Method Summary collapse

Methods included from ExclusiveLeaseHelpers

#in_lock

Constructor Details

#initialize(counter_record, attribute) ⇒ BufferedCounter

Returns a new instance of BufferedCounter.



34
35
36
37
# File 'lib/gitlab/counters/buffered_counter.rb', line 34

def initialize(counter_record, attribute)
  @counter_record = counter_record
  @attribute = attribute
end

Instance Method Details

#amount_to_be_flushedObject

amount_to_be_flushed returns the total value to be flushed. The total value is the sum of the following:

  • current value in the increment_key

  • any existing value in the flushed_key that has not been flushed



165
166
167
168
169
# File 'lib/gitlab/counters/buffered_counter.rb', line 165

def amount_to_be_flushed
  redis_state do |redis|
    redis.eval(LUA_FLUSH_INCREMENT_SCRIPT, keys: [key, flushed_key])
  end
end

#bulk_increment(increments) ⇒ Object



89
90
91
92
93
94
95
96
97
98
99
100
101
# File 'lib/gitlab/counters/buffered_counter.rb', line 89

def bulk_increment(increments)
  result = redis_state do |redis|
    redis.pipelined do |pipeline|
      increments.each do |increment|
        pipeline.eval(LUA_INCREMENT_WITH_DEDUPLICATION_SCRIPT, **increment_args(increment))
      end
    end
  end

  FlushCounterIncrementsWorker.perform_in(WORKER_DELAY, counter_record.class.name, counter_record.id, attribute)

  result.last.to_i
end

#cleanup_refreshObject



134
135
136
137
138
139
140
141
142
143
# File 'lib/gitlab/counters/buffered_counter.rb', line 134

def cleanup_refresh
  redis_state do |redis|
    while (shards = redis.spop(shards_key, CLEANUP_BATCH_SIZE))
      redis.del(*shards)
      break if shards.size < CLEANUP_BATCH_SIZE

      sleep CLEANUP_INTERVAL_SECONDS
    end
  end
end

#commit_increment!Object



145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
# File 'lib/gitlab/counters/buffered_counter.rb', line 145

def commit_increment!
  with_exclusive_lease do
    flush_amount = amount_to_be_flushed
    next if flush_amount == 0

    counter_record.transaction do
      counter_record.update_counters_with_lease({ attribute => flush_amount })
      remove_flushed_key
    end

    counter_record.execute_after_commit_callbacks
  end

  counter_record.reset.read_attribute(attribute)
end

#finalize_refreshObject



125
126
127
128
129
130
131
132
# File 'lib/gitlab/counters/buffered_counter.rb', line 125

def finalize_refresh
  redis_state do |redis|
    redis.eval(LUA_FINALIZE_REFRESH_SCRIPT, keys: [key, refresh_key, refresh_indicator_key])
  end

  FlushCounterIncrementsWorker.perform_in(WORKER_DELAY, counter_record.class.name, counter_record.id, attribute)
  ::Counters::CleanupRefreshWorker.perform_async(counter_record.class.name, counter_record.id, attribute)
end

#flushed_keyObject



179
180
181
# File 'lib/gitlab/counters/buffered_counter.rb', line 179

def flushed_key
  "#{key}:flushed"
end

#getObject



39
40
41
42
43
# File 'lib/gitlab/counters/buffered_counter.rb', line 39

def get
  redis_state do |redis|
    redis.get(key).to_i
  end
end

#increment(increment) ⇒ Object



79
80
81
82
83
84
85
86
87
# File 'lib/gitlab/counters/buffered_counter.rb', line 79

def increment(increment)
  result = redis_state do |redis|
    redis.eval(LUA_INCREMENT_WITH_DEDUPLICATION_SCRIPT, **increment_args(increment)).to_i
  end

  FlushCounterIncrementsWorker.perform_in(WORKER_DELAY, counter_record.class.name, counter_record.id, attribute)

  result
end

#initiate_refresh!Object



109
110
111
112
113
114
115
# File 'lib/gitlab/counters/buffered_counter.rb', line 109

def initiate_refresh!
  counter_record.update!(attribute => 0)

  redis_state do |redis|
    redis.eval(LUA_INITIATE_REFRESH_SCRIPT, keys: [key, refresh_indicator_key])
  end
end

#keyObject



171
172
173
174
175
176
177
# File 'lib/gitlab/counters/buffered_counter.rb', line 171

def key
  project_id = counter_record.project.id
  record_name = counter_record.class
  record_id = counter_record.id

  "project:{#{project_id}}:counters:#{record_name}:#{record_id}:#{attribute}"
end

#refresh_indicator_keyObject



183
184
185
# File 'lib/gitlab/counters/buffered_counter.rb', line 183

def refresh_indicator_key
  "#{key}:refresh-in-progress"
end

#refresh_keyObject



187
188
189
# File 'lib/gitlab/counters/buffered_counter.rb', line 187

def refresh_key
  "#{key}:refresh"
end