Class: ArAsyncCounterCache::IncrementCountersWorker
- Inherits:
-
Object
- Object
- ArAsyncCounterCache::IncrementCountersWorker
- Extended by:
- Resque::Plugins::LockTimeout
- Defined in:
- lib/ar_async_counter_cache/increment_counters_worker.rb
Overview
ArAsyncCounterCache will very quickly increment a counter cache in Redis, which will then later be updated by a Resque job. Using require-lock-timeout, we can ensure that only one job per _ is running at a time.
Class Method Summary collapse
-
.around_perform_lock1(*args) ⇒ Object
The name of this method ensures that it runs within around_perform_lock.
- .cache_and_enqueue(parent_class, id, column, direction) ⇒ Object
-
.cache_key(*args) ⇒ Object
args: (parent_class, id, column).
-
.identifier(*args) ⇒ Object
args: (parent_class, id, column).
-
.lock_failed(*args) ⇒ Object
Try again later if lock is in use.
- .perform(parent_class, id, column) ⇒ Object
- .redis ⇒ Object
Class Method Details
.around_perform_lock1(*args) ⇒ Object
The name of this method ensures that it runs within around_perform_lock.
We’ve leveraged resque-lock-timeout to ensure that only one job is running at a time. Now, this around filter essentially ensures that only one job per parent-column can sit on the queue at once. Since the cache_key entry in redis stores the most up-to-date delta for the parent’s counter cache, we don’t have to actually perform the Klass.update_counters for every increment/decrement. We can batch process!
79 80 81 82 83 84 85 86 87 |
# File 'lib/ar_async_counter_cache/increment_counters_worker.rb', line 79 def self.around_perform_lock1(*args) # Remove all other instances of this job (with the same args) from the # queue. Uses LREM (http://code.google.com/p/redis/wiki/LremCommand) which # takes the form: "LREM key count value" and if count == 0 removes all # instances of value from the list. redis_job_value = ::Resque.encode(:class => self.to_s, :args => args) ::Resque.redis.lrem("queue:#{@queue}", 0, redis_job_value) yield end |
.cache_and_enqueue(parent_class, id, column, direction) ⇒ Object
38 39 40 41 42 43 44 45 46 47 48 49 |
# File 'lib/ar_async_counter_cache/increment_counters_worker.rb', line 38 def self.cache_and_enqueue(parent_class, id, column, direction) parent_class = parent_class.to_s key = cache_key(parent_class, id, column) if direction == :increment redis.incr(key) elsif direction == :decrement redis.decr(key) else raise ArgumentError, "Must call ArAsyncCounterCache::IncrementCountersWorker with :increment or :decrement" end ::Resque.enqueue(self, parent_class, id, column) end |
.cache_key(*args) ⇒ Object
args: (parent_class, id, column)
66 67 68 |
# File 'lib/ar_async_counter_cache/increment_counters_worker.rb', line 66 def self.cache_key(*args) "ar-async-counter-cache:#{identifier(*args)}" end |
.identifier(*args) ⇒ Object
args: (parent_class, id, column)
56 57 58 |
# File 'lib/ar_async_counter_cache/increment_counters_worker.rb', line 56 def self.identifier(*args) args.join('-') end |
.lock_failed(*args) ⇒ Object
Try again later if lock is in use.
61 62 63 |
# File 'lib/ar_async_counter_cache/increment_counters_worker.rb', line 61 def self.lock_failed(*args) ::Resque.enqueue(self, *args) end |
.perform(parent_class, id, column) ⇒ Object
89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 |
# File 'lib/ar_async_counter_cache/increment_counters_worker.rb', line 89 def self.perform(parent_class, id, column) key = cache_key(parent_class, id, column) if (delta = redis.getset(key, 0).to_i) != 0 begin parent_class = ::Resque.constantize(parent_class) parent_class.find(id) parent_class.update_counters(id, column => delta) rescue Exception => e # If anything happens, set back the counter cache. if delta > 0 redis.incrby(key, delta) elsif delta < 0 redis.decrby(key, -delta) end raise e end end end |
.redis ⇒ Object
51 52 53 |
# File 'lib/ar_async_counter_cache/increment_counters_worker.rb', line 51 def self.redis @redis || ::Resque.redis end |