Class: Concurrent::ReentrantReadWriteLock
- Inherits:
-
Synchronization::Object
- Object
- Synchronization::Object
- Concurrent::ReentrantReadWriteLock
- Defined in:
- lib/concurrent/atomic/reentrant_read_write_lock.rb
Overview
Re-entrant read-write lock implementation
Allows any number of concurrent readers, but only one concurrent writer (And while the “write” lock is taken, no read locks can be obtained either. Hence, the write lock can also be called an “exclusive” lock.)
If another thread has taken a read lock, any thread which wants a write lock will block until all the readers release their locks. However, once a thread starts waiting to obtain a write lock, any additional readers that come along will also wait (so writers are not starved).
A thread can acquire both a read and write lock at the same time. A thread can also acquire a read lock OR a write lock more than once. Only when the read (or write) lock is released as many times as it was acquired, will the thread actually let it go, allowing other threads which might have been waiting to proceed.
If both read and write locks are acquired by the same thread, it is not strictly necessary to release them in the same order they were acquired. In other words, the following code is legal:
This implementation was inspired by ‘java.util.concurrent.ReentrantReadWriteLock`.
Instance Method Summary collapse
-
#acquire_read_lock ⇒ Boolean
Acquire a read lock.
-
#acquire_write_lock ⇒ Boolean
Acquire a write lock.
-
#initialize ⇒ ReentrantReadWriteLock
constructor
Create a new ‘ReentrantReadWriteLock` in the unlocked state.
-
#release_read_lock ⇒ Boolean
Release a previously acquired read lock.
-
#release_write_lock ⇒ Boolean
Release a previously acquired write lock.
-
#try_read_lock ⇒ Boolean
Try to acquire a read lock and return true if we succeed.
-
#try_write_lock ⇒ Boolean
Try to acquire a write lock and return true if we succeed.
-
#with_read_lock { ... } ⇒ Object
Execute a block operation within a read lock.
-
#with_write_lock { ... } ⇒ Object
Execute a block operation within a write lock.
Methods inherited from Synchronization::Object
attr_atomic, attr_volatile, ensure_safe_initialization_when_final_fields_are_present, new, safe_initialization!, safe_initialization?, volatile_cas_fields
Constructor Details
#initialize ⇒ ReentrantReadWriteLock
Create a new ‘ReentrantReadWriteLock` in the unlocked state.
105 106 107 108 109 110 111 |
# File 'lib/concurrent/atomic/reentrant_read_write_lock.rb', line 105 def initialize super() @Counter = AtomicFixnum.new(0) # single integer which represents lock state @ReadQueue = Synchronization::Lock.new # used to queue waiting readers @WriteQueue = Synchronization::Lock.new # used to queue waiting writers @HeldCount = ThreadLocalVar.new(0) # indicates # of R & W locks held by this thread end |
Instance Method Details
#acquire_read_lock ⇒ Boolean
Acquire a read lock. If a write lock is held by another thread, will block until it is released.
158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 |
# File 'lib/concurrent/atomic/reentrant_read_write_lock.rb', line 158 def acquire_read_lock if (held = @HeldCount.value) > 0 # If we already have a lock, there's no need to wait if held & READ_LOCK_MASK == 0 # But we do need to update the counter, if we were holding a write # lock but not a read lock @Counter.update { |c| c + 1 } end @HeldCount.value = held + 1 return true end while true c = @Counter.value raise ResourceLimitError.new('Too many reader threads') if max_readers?(c) # If a writer is waiting OR running when we first queue up, we need to wait if waiting_or_running_writer?(c) # Before going to sleep, check again with the ReadQueue mutex held @ReadQueue.synchronize do @ReadQueue.ns_wait if waiting_or_running_writer? end # Note: the above 'synchronize' block could have used #wait_until, # but that waits repeatedly in a loop, checking the wait condition # each time it wakes up (to protect against spurious wakeups) # But we are already in a loop, which is only broken when we successfully # acquire the lock! So we don't care about spurious wakeups, and would # rather not pay the extra overhead of using #wait_until # After a reader has waited once, they are allowed to "barge" ahead of waiting writers # But if a writer is *running*, the reader still needs to wait (naturally) while true c = @Counter.value if running_writer?(c) @ReadQueue.synchronize do @ReadQueue.ns_wait if running_writer? end elsif @Counter.compare_and_set(c, c+1) @HeldCount.value = held + 1 return true end end elsif @Counter.compare_and_set(c, c+1) @HeldCount.value = held + 1 return true end end end |
#acquire_write_lock ⇒ Boolean
Acquire a write lock. Will block and wait for all active readers and writers.
253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 |
# File 'lib/concurrent/atomic/reentrant_read_write_lock.rb', line 253 def acquire_write_lock if (held = @HeldCount.value) >= WRITE_LOCK_HELD # if we already have a write (exclusive) lock, there's no need to wait @HeldCount.value = held + WRITE_LOCK_HELD return true end while true c = @Counter.value raise ResourceLimitError.new('Too many writer threads') if max_writers?(c) # To go ahead and take the lock without waiting, there must be no writer # running right now, AND no writers who came before us still waiting to # acquire the lock # Additionally, if any read locks have been taken, we must hold all of them if c == held # If we successfully swap the RUNNING_WRITER bit on, then we can go ahead if @Counter.compare_and_set(c, c+RUNNING_WRITER) @HeldCount.value = held + WRITE_LOCK_HELD return true end elsif @Counter.compare_and_set(c, c+WAITING_WRITER) while true # Now we have successfully incremented, so no more readers will be able to increment # (they will wait instead) # However, readers OR writers could decrement right here @WriteQueue.synchronize do # So we have to do another check inside the synchronized section # If a writer OR another reader is running, then go to sleep c = @Counter.value @WriteQueue.ns_wait if running_writer?(c) || running_readers(c) != held end # Note: if you are thinking of replacing the above 'synchronize' block # with #wait_until, read the comment in #acquire_read_lock first! # We just came out of a wait # If we successfully turn the RUNNING_WRITER bit on with an atomic swap, # then we are OK to stop waiting and go ahead # Otherwise go back and wait again c = @Counter.value if !running_writer?(c) && running_readers(c) == held && @Counter.compare_and_set(c, c+RUNNING_WRITER-WAITING_WRITER) @HeldCount.value = held + WRITE_LOCK_HELD return true end end end end end |
#release_read_lock ⇒ Boolean
Release a previously acquired read lock.
232 233 234 235 236 237 238 239 240 241 242 243 244 245 |
# File 'lib/concurrent/atomic/reentrant_read_write_lock.rb', line 232 def release_read_lock held = @HeldCount.value = @HeldCount.value - 1 rlocks_held = held & READ_LOCK_MASK if rlocks_held == 0 c = @Counter.update { |counter| counter - 1 } # If one or more writers were waiting, and we were the last reader, wake a writer up if waiting_or_running_writer?(c) && running_readers(c) == 0 @WriteQueue.signal end elsif rlocks_held == READ_LOCK_MASK raise IllegalOperationError, "Cannot release a read lock which is not held" end true end |
#release_write_lock ⇒ Boolean
Release a previously acquired write lock.
327 328 329 330 331 332 333 334 335 336 337 338 |
# File 'lib/concurrent/atomic/reentrant_read_write_lock.rb', line 327 def release_write_lock held = @HeldCount.value = @HeldCount.value - WRITE_LOCK_HELD wlocks_held = held & WRITE_LOCK_MASK if wlocks_held == 0 c = @Counter.update { |counter| counter - RUNNING_WRITER } @ReadQueue.broadcast @WriteQueue.signal if waiting_writers(c) > 0 elsif wlocks_held == WRITE_LOCK_MASK raise IllegalOperationError, "Cannot release a write lock which is not held" end true end |
#try_read_lock ⇒ Boolean
Try to acquire a read lock and return true if we succeed. If it cannot be acquired immediately, return false.
211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 |
# File 'lib/concurrent/atomic/reentrant_read_write_lock.rb', line 211 def try_read_lock if (held = @HeldCount.value) > 0 if held & READ_LOCK_MASK == 0 # If we hold a write lock, but not a read lock... @Counter.update { |c| c + 1 } end @HeldCount.value = held + 1 return true else c = @Counter.value if !waiting_or_running_writer?(c) && @Counter.compare_and_set(c, c+1) @HeldCount.value = held + 1 return true end end false end |
#try_write_lock ⇒ Boolean
Try to acquire a write lock and return true if we succeed. If it cannot be acquired immediately, return false.
308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 |
# File 'lib/concurrent/atomic/reentrant_read_write_lock.rb', line 308 def try_write_lock if (held = @HeldCount.value) >= WRITE_LOCK_HELD @HeldCount.value = held + WRITE_LOCK_HELD return true else c = @Counter.value if !waiting_or_running_writer?(c) && running_readers(c) == held && @Counter.compare_and_set(c, c+RUNNING_WRITER) @HeldCount.value = held + WRITE_LOCK_HELD return true end end false end |
#with_read_lock { ... } ⇒ Object
Execute a block operation within a read lock.
122 123 124 125 126 127 128 129 130 |
# File 'lib/concurrent/atomic/reentrant_read_write_lock.rb', line 122 def with_read_lock raise ArgumentError.new('no block given') unless block_given? acquire_read_lock begin yield ensure release_read_lock end end |
#with_write_lock { ... } ⇒ Object
Execute a block operation within a write lock.
141 142 143 144 145 146 147 148 149 |
# File 'lib/concurrent/atomic/reentrant_read_write_lock.rb', line 141 def with_write_lock raise ArgumentError.new('no block given') unless block_given? acquire_write_lock begin yield ensure release_write_lock end end |