Module: Resque::Plugins::LockTimeout
- Defined in:
- lib/resque/plugins/lock_timeout.rb
Overview
If you want only one instance of your job running at a time, extend it with this module:
require ‘resque-lock-timeout’
class UpdateNetworkGraph
extend Resque::Plugins::LockTimeout
@queue = :network_graph
def self.perform(repo_id)
heavy_lifting
end
end
If you wish to limit the durati on a lock may be held for, you can set/override ‘lock_timeout`. e.g.
class UpdateNetworkGraph
extend Resque::Plugins::LockTimeout
@queue = :network_graph
# lock may be held for upto an hour.
@lock_timeout = 3600
def self.perform(repo_id)
heavy_lifting
end
end
Instance Method Summary collapse
-
#acquire_lock!(*args) ⇒ Boolean, Fixnum
Try to acquire a lock.
-
#acquire_lock_algorithm!(lock_key, *args) ⇒ Object
Attempts to aquire the lock using a timeout / deadlock algorithm.
-
#around_perform_lock(*args) ⇒ Object
Where the magic happens.
-
#before_enqueue_lock(*args) ⇒ Object
Try to acquire lock before enqueueing job.
-
#identifier(*args) ⇒ String?
abstract
Builds an identifier using the job arguments.
-
#lock_expired_before_release(*args) ⇒ Object
abstract
Hook method; called when the lock expired before we released it.
-
#lock_failed(*args) ⇒ Object
abstract
Hook method; called when a were unable to aquire the lock.
-
#lock_redis ⇒ Redis
Override to fully control the redis object used for storing the locks.
-
#lock_timeout(*args) ⇒ Fixnum
Number of seconds the lock may be held for.
-
#locked?(*args) ⇒ Boolean
Convenience method, not used internally.
-
#redis_lock_key(*args) ⇒ String
Override to fully control the lock key used.
-
#refresh_lock!(*args) ⇒ Object
Refresh the lock.
-
#release_lock!(*args) ⇒ Object
Release the lock.
Instance Method Details
#acquire_lock!(*args) ⇒ Boolean, Fixnum
Try to acquire a lock.
-
Returns false; when unable to acquire the lock.
-
Returns true; when lock acquired, without a timeout.
-
Returns timestamp; when lock acquired with a timeout, timestamp is when the lock timeout expires.
106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 |
# File 'lib/resque/plugins/lock_timeout.rb', line 106 def acquire_lock!(*args) acquired = false lock_key = redis_lock_key(*args) unless lock_timeout(*args) > 0 # Acquire without using a timeout. acquired = true if lock_redis.setnx(lock_key, true) else # Acquire using the timeout algorithm. acquired, lock_until = acquire_lock_algorithm!(lock_key, *args) end lock_failed(*args) if !acquired lock_until && acquired ? lock_until : acquired end |
#acquire_lock_algorithm!(lock_key, *args) ⇒ Object
Attempts to aquire the lock using a timeout / deadlock algorithm.
Locking algorithm: code.google.com/p/redis/wiki/SetnxCommand
128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 |
# File 'lib/resque/plugins/lock_timeout.rb', line 128 def acquire_lock_algorithm!(lock_key, *args) now = Time.now.to_i lock_until = now + lock_timeout(*args) acquired = false return [true, lock_until] if lock_redis.setnx(lock_key, lock_until) # Can't acquire the lock, see if it has expired. lock_expiration = lock_redis.get(lock_key) if lock_expiration && lock_expiration.to_i < now # expired, try to acquire. lock_expiration = lock_redis.getset(lock_key, lock_until) if lock_expiration.nil? || lock_expiration.to_i < now acquired = true end else # Try once more... acquired = true if lock_redis.setnx(lock_key, lock_until) end [acquired, lock_until] end |
#around_perform_lock(*args) ⇒ Object
Where the magic happens.
176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 |
# File 'lib/resque/plugins/lock_timeout.rb', line 176 def around_perform_lock(*args) begin yield ensure # Release the lock on success and error. Unless a lock_timeout is # used, then we need to be more careful before releasing the lock. if lock_timeout(*args) > 0 lock_until = lock_redis.get(redis_lock_key(*args)).to_i now = Time.now.to_i if lock_until < now # Eeek! Lock expired before perform finished. Trigger callback. lock_expired_before_release(*args) return # dont relase lock. end end release_lock!(*args) end end |
#before_enqueue_lock(*args) ⇒ Object
Try to acquire lock before enqueueing job. If lock can’t be acquired, the job won’t be enqueued
169 170 171 |
# File 'lib/resque/plugins/lock_timeout.rb', line 169 def before_enqueue_lock(*args) acquire_lock!(*args) end |
#identifier(*args) ⇒ String?
You may override to implement a custom identifier, you should consider doing this if your job arguments are many/long or may not cleanly cleanly to strings.
Builds an identifier using the job arguments. This identifier is used as part of the redis lock key.
42 43 44 |
# File 'lib/resque/plugins/lock_timeout.rb', line 42 def identifier(*args) args.join('-') end |
#lock_expired_before_release(*args) ⇒ Object
Hook method; called when the lock expired before we released it.
95 96 |
# File 'lib/resque/plugins/lock_timeout.rb', line 95 def lock_expired_before_release(*args) end |
#lock_failed(*args) ⇒ Object
Hook method; called when a were unable to aquire the lock.
88 89 |
# File 'lib/resque/plugins/lock_timeout.rb', line 88 def lock_failed(*args) end |
#lock_redis ⇒ Redis
Override to fully control the redis object used for storing the locks.
The default is Resque.redis
52 53 54 |
# File 'lib/resque/plugins/lock_timeout.rb', line 52 def lock_redis Resque.redis end |
#lock_timeout(*args) ⇒ Fixnum
Number of seconds the lock may be held for. A value of 0 or below will lock without a timeout.
73 74 75 |
# File 'lib/resque/plugins/lock_timeout.rb', line 73 def lock_timeout(*args) @lock_timeout ||= 0 end |
#locked?(*args) ⇒ Boolean
Convenience method, not used internally.
80 81 82 |
# File 'lib/resque/plugins/lock_timeout.rb', line 80 def locked?(*args) lock_redis.exists(redis_lock_key(*args)) end |
#redis_lock_key(*args) ⇒ String
Override to fully control the lock key used. It is passed the job arguments.
The default looks like this: ‘resque-lock-timeout:<class name>:<identifier>`
64 65 66 |
# File 'lib/resque/plugins/lock_timeout.rb', line 64 def redis_lock_key(*args) ['lock', name, identifier(*args)].compact.join(':') end |
#refresh_lock!(*args) ⇒ Object
Refresh the lock.
160 161 162 163 164 |
# File 'lib/resque/plugins/lock_timeout.rb', line 160 def refresh_lock!(*args) now = Time.now.to_i lock_until = now + lock_timeout(*args) lock_redis.set(redis_lock_key(*args), lock_until) end |
#release_lock!(*args) ⇒ Object
Release the lock.
153 154 155 |
# File 'lib/resque/plugins/lock_timeout.rb', line 153 def release_lock!(*args) lock_redis.del(redis_lock_key(*args)) end |