Class: Mongo::Locking::Locker
- Inherits:
-
Object
- Object
- Mongo::Locking::Locker
- Includes:
- Exceptions
- Defined in:
- lib/mongo/locking/locker.rb
Overview
Locker is a container for isolating all the locking-related methods, so we minimally impact the namespace of wherever we’re mixed into.
In addition to the Mongo-based, per-process, blocking lock mechanism itself, we also employ a thread-local lock refcount to achieve non-blocking behaviour when nesting lock closures. This is useful for when multiple, isolated code paths are all defensive with locks, but are arbitrarily called within the same thread of execution.
We try to limit the number of these objects, so as to minimize extra cost attached to model instance hydration. Thus the Locker is attached to the model class, and maintains refcounts based on the key it’s configured with.
Constant Summary collapse
- DEFAULT_OPTIONS =
{ :max_retries => 5, :first_retry_interval => 0.2.seconds, :max_retry_interval => 5.seconds, :max_lifetime => 10.minutes, }
Instance Attribute Summary collapse
-
#config ⇒ Object
readonly
Returns the value of attribute config.
Instance Method Summary collapse
-
#acquire(from) ⇒ Object
We increment refcounts ASARP so that any further (nested) calls won’t block.
-
#initialize(opts = {}) ⇒ Locker
constructor
A new instance of Locker.
- #is_root? ⇒ Boolean
- #key_for(lockable) ⇒ Object
- #parent_for(lockable) ⇒ Object
- #refcounts ⇒ Object
- #release(from) ⇒ Object
- #root_for(from) ⇒ Object
- #scope_for(lockable) ⇒ Object
Constructor Details
#initialize(opts = {}) ⇒ Locker
Returns a new instance of Locker.
34 35 36 37 |
# File 'lib/mongo/locking/locker.rb', line 34 def initialize(opts = {}) @config = DEFAULT_OPTIONS.merge(opts) @refcount_key = "mongo_locking_refcounts_#{@config[:class_name]}" end |
Instance Attribute Details
#config ⇒ Object (readonly)
Returns the value of attribute config.
24 25 26 |
# File 'lib/mongo/locking/locker.rb', line 24 def config @config end |
Instance Method Details
#acquire(from) ⇒ Object
We increment refcounts ASARP so that any further (nested) calls won’t block. But that means we have to make sure to decrement it on any failure case.
46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 |
# File 'lib/mongo/locking/locker.rb', line 46 def acquire(from) lockable = root_for(from) locker = lockable.class.locker scope = locker.scope_for(lockable) key = locker.key_for(lockable) name = scope + "/" + key refcounts[key] += 1 if refcounts[key] > 1 info "acquire: re-using lock for #{name}##{refcounts[key]}" return lockable end target = { :scope => scope, :key => key } interval = self.config[:first_retry_interval] retries = 0 debug "acquire: attempting lock of #{name}" begin a_lock = atomic_inc(target, { :refcount => 1 }) refcount = a_lock['refcount'] # If the refcount is 0 or somehow less than 0, after we just # incremented, then retry without counting against the max. if refcount < 1 retries -= 1 debug "acquire: refcount #{refcount}, unexpected state" raise LockFailure end # Check lock expiration. if a_lock.has_key?('expire_at') and a_lock['expire_at'] < Time.now # If the lock is "expired". We assume the owner of the # lock is "gone" without decrementing the refcount. warn "acquire: #{name} lock expired" # Attempt to decrement the refcount to "reverse" the # damage caused by the "gone" process. There might be # more than one process trying to do this at the same # time. Therefore, we need the refcount > 1 guard. If # the lock's refcount is no longer > 1 by the time this # process try to decrement, Mongo will raise a # Mongo::OperationFailure. Regardless of reason, if it # fails, we fail. a_lock = atomic_inc(target.merge({:refcount => {'$gt' => 1} }), { :refcount => -1 }) rescue nil unless a_lock # We lost the race to "reverse" the damage. Someone # else has the lock now. We will retry. raise LockFailure end # We have won the race to "reverse" the damage but we # may have not "reversed" enough of the damage. # Consider the case that the expired lock has a large # refcount - we still need to check refcount to make # sure that we are have acquired the lock. refcount = a_lock['refcount'] # The rest of the expired_lock handling logic coincides # with normal lock logic. end # If recount is greater than 1, we lost the race. Decrement # and try again. if refcount > 1 atomic_inc(target, { :refcount => -1 }) debug "acquire: refcount #{refcount}, race lost" raise LockFailure end # If refcount == 1, we have the lock and thus renew # its expire_at. # # NOTE: This expire_at renewal only happens when a process # acquires the lock for the first time. Subsequent lock # reuse will NOT renew expire_at. This assumes that all # legitimate operations should complete within # config[:max_lifetime] time limit. atomic_update(target, {'expire_at' => self.config[:max_lifetime].from_now}) rescue LockFailure => e retries += 1 if retries >= self.config[:max_retries] refcounts[key] -= 1 raise LockTimeout, "unable to acquire lock #{name}" end warn "acquire: #{name} refcount #{refcount}, retry #{retries} for lock" sleep(interval.to_f) interval = [self.config[:max_retry_interval].to_f, interval * 2].min retry rescue => e refcounts[key] -= 1 log_exception(e) raise LockFailure, "unable to acquire lock #{name}" end info "acquire: #{name} locked (#{refcount})" return lockable end |
#is_root? ⇒ Boolean
247 248 249 |
# File 'lib/mongo/locking/locker.rb', line 247 def is_root? self.config[:parent].nil? end |
#key_for(lockable) ⇒ Object
221 222 223 224 225 226 227 |
# File 'lib/mongo/locking/locker.rb', line 221 def key_for(lockable) return case key = self.config[:key] when Proc then key.call(self).to_s when Symbol then lockable.send(key).to_s else raise InvalidConfig, "unknown key type #{key.inspect}" end end |
#parent_for(lockable) ⇒ Object
238 239 240 241 242 243 244 245 |
# File 'lib/mongo/locking/locker.rb', line 238 def parent_for(lockable) return case parent = self.config[:parent] when Proc then parent.call(lockable) when Symbol then lockable.send(parent) when NilClass then nil else raise InvalidConfig, "unknown parent type #{parent.inspect}" end end |
#refcounts ⇒ Object
39 40 41 |
# File 'lib/mongo/locking/locker.rb', line 39 def refcounts Thread.current[@refcount_key] ||= Hash.new(0) end |
#release(from) ⇒ Object
154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 |
# File 'lib/mongo/locking/locker.rb', line 154 def release(from) lockable = root_for(from) locker = lockable.class.locker key = locker.key_for(lockable) scope = locker.scope_for(lockable) name = scope + "/" + key refcounts[key] -= 1 if refcounts[key] > 0 info "release: re-using lock for #{name}##{refcounts[key]}" return true end target = { :scope => scope, :key => key } refcount = atomic_inc(target, { :refcount => -1 })['refcount'] info "release: #{name} unlocked (#{refcount})" # If the refcount is at zero, nuke it out of the table. # # NOTE: If the following delete fails (e.g. something else # incremented it before we tried to delete it), it will raise: # # <Mongo::OperationFailure: Database command 'findandmodify' # failed: {"errmsg"=>"No matching object found", "ok"=>0.0}> # # This is normal for a concurrent system. # # Since a lock with refcount 0 does not impact lock functionality, # we can also ignore any other exceptions during lock deletion. # # We use 'rescue nil' to ignore all exceptions. if refcount == 0 if hash = atomic_delete(target.merge({ :refcount => 0 })) rescue nil debug "release: lock #{name} no longer needed, deleted" end # Nuke the key from our instance refcounts so we don't # balloon during long-lived processes. refcounts.delete(key) end rescue => e log_exception(e) raise LockFailure, "unable to release lock #{name}" end |
#root_for(from) ⇒ Object
202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 |
# File 'lib/mongo/locking/locker.rb', line 202 def root_for(from) lockable = from visited = Set.new([lockable.class]) while parent = lockable.class.locker.parent_for(lockable) lockable = parent if visited.include? lockable.class raise CircularLock, "already visited #{lockable.class} (#{visited.inspect})" end visited << lockable.class end raise InvalidConfig, "root #{lockable.inspect} is not lockable" unless lockable.class.locker.is_root? return lockable end |
#scope_for(lockable) ⇒ Object
229 230 231 232 233 234 235 236 |
# File 'lib/mongo/locking/locker.rb', line 229 def scope_for(lockable) return case scope = self.config[:scope] when Proc then scope.call(lockable).to_s when Symbol then scope.to_s when String then scope else raise InvalidConfig, "unknown scope type #{scope.inspect}" end end |