Module: CachedCounting

Extended by:
ActiveSupport::Concern
Included in:
ApplicationRequest, WebCrawlerRequest
Defined in:
app/models/concerns/cached_counting.rb

Constant Summary collapse

LUA_HGET_DEL =
DiscourseRedis::EvalHelper.new <<~LUA
  local result = redis.call("HGET", KEYS[1], KEYS[2])
  redis.call("HDEL", KEYS[1], KEYS[2])

  return result
LUA
QUEUE =
Queue.new
SLEEP_SECONDS =
1
FLUSH_DB_ITERATIONS =
60
MUTEX =
Mutex.new
ENSURE_THREAD_COOLDOWN_SECONDS =
5
COUNTER_REDIS_HASH =
"CounterCacheHash"
DB_FLUSH_COOLDOWN_SECONDS =
60
DB_COOLDOWN_KEY =
"cached_counting_cooldown"

Class Method Summary collapse

Class Method Details

.allowed_to_flush_to_db?Boolean

Returns:

  • (Boolean)


151
152
153
154
155
156
157
158
# File 'app/models/concerns/cached_counting.rb', line 151

def self.allowed_to_flush_to_db?
  Discourse.redis.without_namespace.set(
    DB_COOLDOWN_KEY,
    "1",
    ex: DB_FLUSH_COOLDOWN_SECONDS,
    nx: true,
  )
end

.clear_flush_to_db_lock!Object



143
144
145
# File 'app/models/concerns/cached_counting.rb', line 143

def self.clear_flush_to_db_lock!
  Discourse.redis.without_namespace.del(DB_COOLDOWN_KEY)
end

.clear_queue!Object



164
165
166
167
168
# File 'app/models/concerns/cached_counting.rb', line 164

def self.clear_queue!
  QUEUE.clear
  redis = Discourse.redis.without_namespace
  redis.del(COUNTER_REDIS_HASH)
end

.disableObject



18
19
20
21
22
23
24
# File 'app/models/concerns/cached_counting.rb', line 18

def self.disable
  @enabled = false
  if @thread && @thread.alive?
    @thread.wakeup
    @thread.join
  end
end

.enableObject



30
31
32
# File 'app/models/concerns/cached_counting.rb', line 30

def self.enable
  @enabled = true
end

.enabled?Boolean

Returns:

  • (Boolean)


26
27
28
# File 'app/models/concerns/cached_counting.rb', line 26

def self.enabled?
  @enabled != false
end

.ensure_thread!Object



42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
# File 'app/models/concerns/cached_counting.rb', line 42

def self.ensure_thread!
  return if !enabled?

  MUTEX.synchronize do
    now = Process.clock_gettime(Process::CLOCK_MONOTONIC)
    delta = @last_ensure_thread && (now - @last_ensure_thread)

    if delta && delta < ENSURE_THREAD_COOLDOWN_SECONDS
      # creating threads can be very expensive and bog down a process
      return
    end

    @last_ensure_thread = now

    @thread = nil if !@thread&.alive?
    @thread ||= Thread.new { thread_loop }
  end
end

.flushObject



85
86
87
88
89
90
91
92
93
94
# File 'app/models/concerns/cached_counting.rb', line 85

def self.flush
  if @thread && @thread.alive?
    @flush = true
    @thread.wakeup
    sleep 0.001 while @flush
  else
    flush_in_memory
    flush_to_db
  end
end

.flush_in_memoryObject



98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
# File 'app/models/concerns/cached_counting.rb', line 98

def self.flush_in_memory
  counts = nil
  while QUEUE.length > 0
    # only 1 consumer, no need to avoid blocking
    key, klass, db, time = QUEUE.deq
    _redis_key = "#{klass},#{db},#{time.strftime("%Y%m%d")},#{key}"
    counts ||= Hash.new(0)
    counts[_redis_key] += 1
  end

  if counts
    counts.each do |redis_key, count|
      Discourse.redis.without_namespace.hincrby(COUNTER_REDIS_HASH, redis_key, count)
    end
  end
end

.flush_to_dbObject



118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
# File 'app/models/concerns/cached_counting.rb', line 118

def self.flush_to_db
  redis = Discourse.redis.without_namespace
  DistributedMutex.synchronize("flush_counters_to_db", redis: redis, validity: 5.minutes) do
    if allowed_to_flush_to_db?
      redis
        .hkeys(COUNTER_REDIS_HASH)
        .each do |key|
          val = LUA_HGET_DEL.eval(redis, [COUNTER_REDIS_HASH, key]).to_i

          # unlikely (protected by mutex), but protect just in case
          # could be a race condition in test
          if val > 0
            klass_name, db, date, local_key = key.split(",", 4)
            date = Date.strptime(date, "%Y%m%d")
            klass = Module.const_get(klass_name)

            RailsMultisite::ConnectionManagement.with_connection(db) do
              klass.write_cache!(local_key, val, date)
            end
          end
        end
    end
  end
end

.flush_to_db_lock_ttlObject



147
148
149
# File 'app/models/concerns/cached_counting.rb', line 147

def self.flush_to_db_lock_ttl
  Discourse.redis.without_namespace.ttl(DB_COOLDOWN_KEY)
end

.queue(key, klass) ⇒ Object



160
161
162
# File 'app/models/concerns/cached_counting.rb', line 160

def self.queue(key, klass)
  QUEUE.push([key, klass, RailsMultisite::ConnectionManagement.current_db, Time.now.utc])
end

.resetObject



34
35
36
37
38
# File 'app/models/concerns/cached_counting.rb', line 34

def self.reset
  @last_ensure_thread = nil
  clear_queue!
  clear_flush_to_db_lock!
end

.thread_loopObject



61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
# File 'app/models/concerns/cached_counting.rb', line 61

def self.thread_loop
  iterations = 0
  while true
    break if !enabled?

    sleep SLEEP_SECONDS
    flush_in_memory
    if (iterations >= FLUSH_DB_ITERATIONS) || @flush
      iterations = 0
      flush_to_db
      @flush = false
    end
    iterations += 1
  end
rescue => ex
  if Redis::CommandError === ex && ex.message =~ /READONLY/
    # do not warn for Redis readonly mode
  elsif PG::ReadOnlySqlTransaction === ex
    # do not warn for PG readonly mode
  else
    Discourse.warn_exception(ex, message: "Unexpected error while processing cached counts")
  end
end