Module: CachedCounting
- Extended by:
- ActiveSupport::Concern
- Included in:
- ApplicationRequest, WebCrawlerRequest
- Defined in:
- app/models/concerns/cached_counting.rb
Constant Summary collapse
- LUA_HGET_DEL =
DiscourseRedis::EvalHelper.new <<~LUA local result = redis.call("HGET", KEYS[1], KEYS[2]) redis.call("HDEL", KEYS[1], KEYS[2]) return result LUA
- QUEUE =
Queue.new
- SLEEP_SECONDS =
1
- FLUSH_DB_ITERATIONS =
60
- MUTEX =
Mutex.new
- ENSURE_THREAD_COOLDOWN_SECONDS =
5
- COUNTER_REDIS_HASH =
"CounterCacheHash"
- DB_FLUSH_COOLDOWN_SECONDS =
60
- DB_COOLDOWN_KEY =
"cached_counting_cooldown"
Class Method Summary collapse
- .allowed_to_flush_to_db? ⇒ Boolean
- .clear_flush_to_db_lock! ⇒ Object
- .clear_queue! ⇒ Object
- .disable ⇒ Object
- .enable ⇒ Object
- .enabled? ⇒ Boolean
- .ensure_thread! ⇒ Object
- .flush ⇒ Object
- .flush_in_memory ⇒ Object
- .flush_to_db ⇒ Object
- .flush_to_db_lock_ttl ⇒ Object
- .queue(key, klass) ⇒ Object
- .reset ⇒ Object
- .thread_loop ⇒ Object
Class Method Details
.allowed_to_flush_to_db? ⇒ Boolean
151 152 153 154 155 156 157 158 |
# File 'app/models/concerns/cached_counting.rb', line 151 def self.allowed_to_flush_to_db? Discourse.redis.without_namespace.set( DB_COOLDOWN_KEY, "1", ex: DB_FLUSH_COOLDOWN_SECONDS, nx: true, ) end |
.clear_flush_to_db_lock! ⇒ Object
143 144 145 |
# File 'app/models/concerns/cached_counting.rb', line 143 def self.clear_flush_to_db_lock! Discourse.redis.without_namespace.del(DB_COOLDOWN_KEY) end |
.clear_queue! ⇒ Object
164 165 166 167 168 |
# File 'app/models/concerns/cached_counting.rb', line 164 def self.clear_queue! QUEUE.clear redis = Discourse.redis.without_namespace redis.del(COUNTER_REDIS_HASH) end |
.disable ⇒ Object
18 19 20 21 22 23 24 |
# File 'app/models/concerns/cached_counting.rb', line 18 def self.disable @enabled = false if @thread && @thread.alive? @thread.wakeup @thread.join end end |
.enable ⇒ Object
30 31 32 |
# File 'app/models/concerns/cached_counting.rb', line 30 def self.enable @enabled = true end |
.enabled? ⇒ Boolean
26 27 28 |
# File 'app/models/concerns/cached_counting.rb', line 26 def self.enabled? @enabled != false end |
.ensure_thread! ⇒ Object
42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 |
# File 'app/models/concerns/cached_counting.rb', line 42 def self.ensure_thread! return if !enabled? MUTEX.synchronize do now = Process.clock_gettime(Process::CLOCK_MONOTONIC) delta = @last_ensure_thread && (now - @last_ensure_thread) if delta && delta < ENSURE_THREAD_COOLDOWN_SECONDS # creating threads can be very expensive and bog down a process return end @last_ensure_thread = now @thread = nil if !@thread&.alive? @thread ||= Thread.new { thread_loop } end end |
.flush ⇒ Object
85 86 87 88 89 90 91 92 93 94 |
# File 'app/models/concerns/cached_counting.rb', line 85 def self.flush if @thread && @thread.alive? @flush = true @thread.wakeup sleep 0.001 while @flush else flush_in_memory flush_to_db end end |
.flush_in_memory ⇒ Object
98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 |
# File 'app/models/concerns/cached_counting.rb', line 98 def self.flush_in_memory counts = nil while QUEUE.length > 0 # only 1 consumer, no need to avoid blocking key, klass, db, time = QUEUE.deq _redis_key = "#{klass},#{db},#{time.strftime("%Y%m%d")},#{key}" counts ||= Hash.new(0) counts[_redis_key] += 1 end if counts counts.each do |redis_key, count| Discourse.redis.without_namespace.hincrby(COUNTER_REDIS_HASH, redis_key, count) end end end |
.flush_to_db ⇒ Object
118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 |
# File 'app/models/concerns/cached_counting.rb', line 118 def self.flush_to_db redis = Discourse.redis.without_namespace DistributedMutex.synchronize("flush_counters_to_db", redis: redis, validity: 5.minutes) do if allowed_to_flush_to_db? redis .hkeys(COUNTER_REDIS_HASH) .each do |key| val = LUA_HGET_DEL.eval(redis, [COUNTER_REDIS_HASH, key]).to_i # unlikely (protected by mutex), but protect just in case # could be a race condition in test if val > 0 klass_name, db, date, local_key = key.split(",", 4) date = Date.strptime(date, "%Y%m%d") klass = Module.const_get(klass_name) RailsMultisite::ConnectionManagement.with_connection(db) do klass.write_cache!(local_key, val, date) end end end end end end |
.flush_to_db_lock_ttl ⇒ Object
147 148 149 |
# File 'app/models/concerns/cached_counting.rb', line 147 def self.flush_to_db_lock_ttl Discourse.redis.without_namespace.ttl(DB_COOLDOWN_KEY) end |
.queue(key, klass) ⇒ Object
160 161 162 |
# File 'app/models/concerns/cached_counting.rb', line 160 def self.queue(key, klass) QUEUE.push([key, klass, RailsMultisite::ConnectionManagement.current_db, Time.now.utc]) end |
.reset ⇒ Object
34 35 36 37 38 |
# File 'app/models/concerns/cached_counting.rb', line 34 def self.reset @last_ensure_thread = nil clear_queue! clear_flush_to_db_lock! end |
.thread_loop ⇒ Object
61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 |
# File 'app/models/concerns/cached_counting.rb', line 61 def self.thread_loop iterations = 0 while true break if !enabled? sleep SLEEP_SECONDS flush_in_memory if (iterations >= FLUSH_DB_ITERATIONS) || @flush iterations = 0 flush_to_db @flush = false end iterations += 1 end rescue => ex if Redis::CommandError === ex && ex. =~ /READONLY/ # do not warn for Redis readonly mode elsif PG::ReadOnlySqlTransaction === ex # do not warn for PG readonly mode else Discourse.warn_exception(ex, message: "Unexpected error while processing cached counts") end end |