Class: Gitlab::Counters::BufferedCounter
- Inherits:
-
Object
- Object
- Gitlab::Counters::BufferedCounter
- Includes:
- ExclusiveLeaseHelpers
- Defined in:
- lib/gitlab/counters/buffered_counter.rb
Constant Summary collapse
- WORKER_DELAY =
10.minutes
- WORKER_LOCK_TTL =
10.minutes
- REFRESH_KEYS_TTL =
Refresh keys are set to expire after a very long time, so that they do not occupy Redis memory indefinitely, if for any reason they are not deleted. In practice, a refresh is not expected to take longer than this TTL.
14.days
- CLEANUP_BATCH_SIZE =
50
- CLEANUP_INTERVAL_SECONDS =
0.1
- MAX_BITMAP_OFFSET =
Limit size of bitmap key to 2^26-1 (~8MB)
67108863
- LUA_FLUSH_INCREMENT_SCRIPT =
<<~LUA local increment_key, flushed_key = KEYS[1], KEYS[2] local increment_value = redis.call("get", increment_key) or 0 local flushed_value = redis.call("incrby", flushed_key, increment_value) if flushed_value == 0 then redis.call("del", increment_key, flushed_key) else redis.call("del", increment_key) end return flushed_value LUA
- LUA_INCREMENT_WITH_DEDUPLICATION_SCRIPT =
<<~LUA local counter_key, refresh_key, refresh_indicator_key = KEYS[1], KEYS[2], KEYS[3] local tracking_shard_key, opposing_tracking_shard_key, shards_key = KEYS[4], KEYS[5], KEYS[6] local amount, tracking_offset = tonumber(ARGV[1]), tonumber(ARGV[2]) -- increment to the counter key when not refreshing if redis.call("exists", refresh_indicator_key) == 0 then return redis.call("incrby", counter_key, amount) end -- deduplicate and increment to the refresh counter key while refreshing local found_duplicate = redis.call("getbit", tracking_shard_key, tracking_offset) if found_duplicate == 1 then return redis.call("get", refresh_key) end redis.call("setbit", tracking_shard_key, tracking_offset, 1) redis.call("expire", tracking_shard_key, #{REFRESH_KEYS_TTL.seconds}) redis.call("sadd", shards_key, tracking_shard_key) redis.call("expire", shards_key, #{REFRESH_KEYS_TTL.seconds}) local found_opposing_change = redis.call("getbit", opposing_tracking_shard_key, tracking_offset) local increment_without_previous_decrement = amount > 0 and found_opposing_change == 0 local decrement_with_previous_increment = amount < 0 and found_opposing_change == 1 local net_change = 0 if increment_without_previous_decrement or decrement_with_previous_increment then net_change = amount end return redis.call("incrby", refresh_key, net_change) LUA
- LUA_INITIATE_REFRESH_SCRIPT =
<<~LUA local counter_key, refresh_indicator_key = KEYS[1], KEYS[2] redis.call("del", counter_key) redis.call("set", refresh_indicator_key, 1, "ex", #{REFRESH_KEYS_TTL.seconds}) LUA
- LUA_FINALIZE_REFRESH_SCRIPT =
<<~LUA local counter_key, refresh_key, refresh_indicator_key = KEYS[1], KEYS[2], KEYS[3] local refresh_amount = redis.call("get", refresh_key) or 0 redis.call("incrby", counter_key, refresh_amount) redis.call("del", refresh_indicator_key, refresh_key) LUA
Constants included from ExclusiveLeaseHelpers
ExclusiveLeaseHelpers::FailedToObtainLockError
Instance Method Summary collapse
-
#amount_to_be_flushed ⇒ Object
amount_to_be_flushed returns the total value to be flushed.
- #bulk_increment(increments) ⇒ Object
- #cleanup_refresh ⇒ Object
- #commit_increment! ⇒ Object
- #finalize_refresh ⇒ Object
- #flushed_key ⇒ Object
- #get ⇒ Object
- #increment(increment) ⇒ Object
-
#initialize(counter_record, attribute) ⇒ BufferedCounter
constructor
A new instance of BufferedCounter.
- #initiate_refresh! ⇒ Object
- #key ⇒ Object
- #refresh_indicator_key ⇒ Object
- #refresh_key ⇒ Object
Methods included from ExclusiveLeaseHelpers
Constructor Details
#initialize(counter_record, attribute) ⇒ BufferedCounter
Returns a new instance of BufferedCounter.
34 35 36 37 |
# File 'lib/gitlab/counters/buffered_counter.rb', line 34 def initialize(counter_record, attribute) @counter_record = counter_record @attribute = attribute end |
Instance Method Details
#amount_to_be_flushed ⇒ Object
amount_to_be_flushed returns the total value to be flushed. The total value is the sum of the following:
-
current value in the increment_key
-
any existing value in the flushed_key that has not been flushed
165 166 167 168 169 |
# File 'lib/gitlab/counters/buffered_counter.rb', line 165 def amount_to_be_flushed redis_state do |redis| redis.eval(LUA_FLUSH_INCREMENT_SCRIPT, keys: [key, flushed_key]) end end |
#bulk_increment(increments) ⇒ Object
89 90 91 92 93 94 95 96 97 98 99 100 101 |
# File 'lib/gitlab/counters/buffered_counter.rb', line 89 def bulk_increment(increments) result = redis_state do |redis| redis.pipelined do |pipeline| increments.each do |increment| pipeline.eval(LUA_INCREMENT_WITH_DEDUPLICATION_SCRIPT, **increment_args(increment)) end end end FlushCounterIncrementsWorker.perform_in(WORKER_DELAY, counter_record.class.name, counter_record.id, attribute) result.last.to_i end |
#cleanup_refresh ⇒ Object
134 135 136 137 138 139 140 141 142 143 |
# File 'lib/gitlab/counters/buffered_counter.rb', line 134 def cleanup_refresh redis_state do |redis| while (shards = redis.spop(shards_key, CLEANUP_BATCH_SIZE)) redis.del(*shards) break if shards.size < CLEANUP_BATCH_SIZE sleep CLEANUP_INTERVAL_SECONDS end end end |
#commit_increment! ⇒ Object
145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 |
# File 'lib/gitlab/counters/buffered_counter.rb', line 145 def commit_increment! with_exclusive_lease do flush_amount = amount_to_be_flushed next if flush_amount == 0 counter_record.transaction do counter_record.update_counters_with_lease({ attribute => flush_amount }) remove_flushed_key end counter_record.execute_after_commit_callbacks end counter_record.reset.read_attribute(attribute) end |
#finalize_refresh ⇒ Object
125 126 127 128 129 130 131 132 |
# File 'lib/gitlab/counters/buffered_counter.rb', line 125 def finalize_refresh redis_state do |redis| redis.eval(LUA_FINALIZE_REFRESH_SCRIPT, keys: [key, refresh_key, refresh_indicator_key]) end FlushCounterIncrementsWorker.perform_in(WORKER_DELAY, counter_record.class.name, counter_record.id, attribute) ::Counters::CleanupRefreshWorker.perform_async(counter_record.class.name, counter_record.id, attribute) end |
#flushed_key ⇒ Object
179 180 181 |
# File 'lib/gitlab/counters/buffered_counter.rb', line 179 def flushed_key "#{key}:flushed" end |
#get ⇒ Object
39 40 41 42 43 |
# File 'lib/gitlab/counters/buffered_counter.rb', line 39 def get redis_state do |redis| redis.get(key).to_i end end |
#increment(increment) ⇒ Object
79 80 81 82 83 84 85 86 87 |
# File 'lib/gitlab/counters/buffered_counter.rb', line 79 def increment(increment) result = redis_state do |redis| redis.eval(LUA_INCREMENT_WITH_DEDUPLICATION_SCRIPT, **increment_args(increment)).to_i end FlushCounterIncrementsWorker.perform_in(WORKER_DELAY, counter_record.class.name, counter_record.id, attribute) result end |
#initiate_refresh! ⇒ Object
109 110 111 112 113 114 115 |
# File 'lib/gitlab/counters/buffered_counter.rb', line 109 def initiate_refresh! counter_record.update!(attribute => 0) redis_state do |redis| redis.eval(LUA_INITIATE_REFRESH_SCRIPT, keys: [key, refresh_indicator_key]) end end |
#key ⇒ Object
171 172 173 174 175 176 177 |
# File 'lib/gitlab/counters/buffered_counter.rb', line 171 def key project_id = counter_record.project.id record_name = counter_record.class record_id = counter_record.id "project:{#{project_id}}:counters:#{record_name}:#{record_id}:#{attribute}" end |
#refresh_indicator_key ⇒ Object
183 184 185 |
# File 'lib/gitlab/counters/buffered_counter.rb', line 183 def refresh_indicator_key "#{key}:refresh-in-progress" end |
#refresh_key ⇒ Object
187 188 189 |
# File 'lib/gitlab/counters/buffered_counter.rb', line 187 def refresh_key "#{key}:refresh" end |