Class: Redis::EM::Mutex
- Inherits:
-
Object
- Object
- Redis::EM::Mutex
- Extended by:
- Errors
- Includes:
- Errors
- Defined in:
- lib/redis/em-mutex.rb,
lib/redis/em-mutex/ns.rb,
lib/redis/em-mutex/macro.rb,
lib/redis/em-mutex/version.rb,
lib/redis/em-mutex/pure_handler.rb,
lib/redis/em-mutex/script_handler.rb
Overview
Cross machine-process-fiber EventMachine + Redis based semaphore.
WARNING:
Methods of this class are NOT thread-safe. They are machine/process/fiber-safe. All method calls must be invoked only from EventMachine’s reactor thread. Wrap mutex calls in EventMachine.shedule from non-reactor threads.
-
The terms “lock” and “semaphore” used in documentation are synonims.
-
The term “owner” denotes a Ruby Fiber executing code in the scope of Machine/Process/Fiber possessing exclusively a named semaphore(s).
Defined Under Namespace
Modules: Errors, Macro, PureHandlerMixin, ScriptHandlerMixin Classes: NS
Constant Summary collapse
- SIGNAL_QUEUE_CHANNEL =
"::#{self.name}::"
- AUTO_NAME_SEED =
'__@'
- DEFAULT_RECONNECT_MAX_RETRIES =
10
- VERSION =
'0.3.1'
- @@default_expire =
3600*24
- @@name_index =
AUTO_NAME_SEED
- @@redis_pool =
nil
- @@watching =
false
- @@signal_queue =
Hash.new {|h,k| h[k] = []}
- @@ns =
nil
- @@handler =
nil
Class Attribute Summary collapse
-
.reconnect_max_retries ⇒ Object
Returns the value of attribute reconnect_max_retries.
Instance Attribute Summary collapse
-
#block_timeout ⇒ Object
Returns the value of attribute block_timeout.
-
#names ⇒ Object
readonly
Returns the value of attribute names.
-
#ns ⇒ Object
readonly
Returns the value of attribute ns.
Class Method Summary collapse
-
.can_refresh_expired? ⇒ Boolean
Whether selected implementation handler supports refreshing of already expired locks.
-
.default_expire ⇒ Object
Default value of expiration timeout in seconds.
-
.default_expire=(value) ⇒ Object
Assigns default value of expiration timeout in seconds.
-
.handler ⇒ Object
Selected implementation handler module name.
-
.lock(*args) ⇒ Object
Attempts to grab the lock and waits if it isn’t available.
- .ns ⇒ Object (also: namespace)
- .ns=(namespace) ⇒ Object (also: namespace=)
- .ready? ⇒ Boolean
- .reconnect_forever? ⇒ Boolean
-
.reset_autoname ⇒ Object
resets Mutex’s automatic name generator.
-
.setup(opts = {}) {|opts| ... } ⇒ Object
Setup redis database and other defaults.
- .setup_handler(handler = nil) ⇒ Object
-
.sleep(seconds) ⇒ Object
EM sleep helper.
-
.start_watcher ⇒ Object
Initializes the “unlock” channel watcher.
-
.stop_watcher(force = false) ⇒ Object
Stops the watcher of the “unlock” channel.
-
.sweep ⇒ Object
Remove all current Machine/Process locks.
-
.synchronize(*args, &block) ⇒ Object
Execute block of code protected with named semaphore.
- .wakeup_queue_all ⇒ Object
-
.watching? ⇒ Boolean
Returns true if watcher is connected.
Instance Method Summary collapse
-
#can_refresh_expired? ⇒ Boolean
Whether selected implementation handler supports refreshing of already expired locks.
- #expire_timeout ⇒ Object
- #expire_timeout=(value) ⇒ Object
-
#initialize(*args) ⇒ Mutex
constructor
Creates a new cross machine/process/fiber semaphore.
-
#namespace ⇒ Object
Returns the value of attribute ns.
-
#sleep(timeout = nil) ⇒ Object
Releases the lock and sleeps ‘timeout` seconds if it is given and non-nil or forever.
-
#synchronize(block_timeout = nil) ⇒ Object
Execute block of code protected with semaphore.
-
#unlock ⇒ Object
Releases the lock unconditionally.
-
#wakeup(fiber) ⇒ Object
(also: #_wakeup)
Wakes up currently sleeping fiber on a mutex.
-
#watching? ⇒ Boolean
Returns true if watcher is connected.
Constructor Details
#initialize(*args) ⇒ Mutex
Creates a new cross machine/process/fiber semaphore
Redis::EM::Mutex.new(*names, = {})
-
*names = lock identifiers - if none they are auto generated
-
options = hash:
-
:name - same as *names (in case *names arguments were omitted)
-
:block - default block timeout
-
:expire - default expire timeout (see: Mutex#lock and Mutex#try_lock)
-
:ns - local namespace (otherwise global namespace is used)
-
:owner - owner definition instead of Fiber#__id__
Raises MutexError if used before Mutex.setup. Raises ArgumentError on invalid options.
88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 |
# File 'lib/redis/em-mutex.rb', line 88 def initialize(*args) raise MutexError, "call #{self.class}::setup first" unless @@redis_pool self.class.setup_handler unless @@handler opts = args.last.kind_of?(Hash) ? args.pop : {} @names = args @names = Array(opts[:name] || "#{@@name_index.succ!}.lock") if @names.empty? @slept = {} raise ArgumentError, "semaphore names must not be empty" if @names.empty? @multi = !@names.one? @ns = opts[:ns] || @@ns @ns_names = @ns ? @names.map {|n| "#@ns:#{n}".freeze }.freeze : @names.map {|n| n.to_s.dup.freeze }.freeze @marsh_names = Marshal.dump(@ns_names) self.expire_timeout = opts[:expire] if opts.key?(:expire) self.block_timeout = opts[:block] if opts.key?(:block) self.extend(@@handler) post_init(opts) end |
Class Attribute Details
.reconnect_max_retries ⇒ Object
Returns the value of attribute reconnect_max_retries.
189 190 191 |
# File 'lib/redis/em-mutex.rb', line 189 def reconnect_max_retries @reconnect_max_retries end |
Instance Attribute Details
#block_timeout ⇒ Object
Returns the value of attribute block_timeout.
108 109 110 |
# File 'lib/redis/em-mutex.rb', line 108 def block_timeout @block_timeout end |
#names ⇒ Object (readonly)
Returns the value of attribute names.
108 109 110 |
# File 'lib/redis/em-mutex.rb', line 108 def names @names end |
#ns ⇒ Object (readonly)
Returns the value of attribute ns.
108 109 110 |
# File 'lib/redis/em-mutex.rb', line 108 def ns @ns end |
Class Method Details
.can_refresh_expired? ⇒ Boolean
Whether selected implementation handler supports refreshing of already expired locks.
64 65 66 |
# File 'lib/redis/em-mutex.rb', line 64 def self.can_refresh_expired? @@handler.can_refresh_expired? end |
.default_expire ⇒ Object
Default value of expiration timeout in seconds.
202 |
# File 'lib/redis/em-mutex.rb', line 202 def default_expire; @@default_expire; end |
.default_expire=(value) ⇒ Object
Assigns default value of expiration timeout in seconds. Must be > 0.
206 207 208 209 |
# File 'lib/redis/em-mutex.rb', line 206 def default_expire=(value) raise ArgumentError, "#{name}.default_expire value must be greater than 0" unless (value = value.to_f) > 0 @@default_expire = value end |
.handler ⇒ Object
Selected implementation handler module name
60 |
# File 'lib/redis/em-mutex.rb', line 60 def self.handler; @@handler && @@handler.name end |
.lock(*args) ⇒ Object
Attempts to grab the lock and waits if it isn’t available. Raises MutexError if mutex was locked by the current owner or if used before Mutex.setup. Raises ArgumentError on invalid options. Returns instance of Redis::EM::Mutex if lock was successfully obtained. Returns ‘nil` if lock wasn’t available within ‘:block` seconds.
Redis::EM::Mutex.lock(*names, = {})
-
*names = lock identifiers - if none they are auto generated
-
options = hash:
-
:name - same as name (in case *names arguments were omitted)
-
:block - block timeout
-
:expire - expire timeout (see: Mutex#lock and Mutex#try_lock)
-
:ns - namespace (otherwise global namespace is used)
457 458 459 460 |
# File 'lib/redis/em-mutex.rb', line 457 def lock(*args) mutex = new(*args) mutex if mutex.lock end |
.ns ⇒ Object Also known as: namespace
196 |
# File 'lib/redis/em-mutex.rb', line 196 def ns; @@ns; end |
.ns=(namespace) ⇒ Object Also known as: namespace=
197 |
# File 'lib/redis/em-mutex.rb', line 197 def ns=(namespace); @@ns = namespace; end |
.ready? ⇒ Boolean
331 332 333 |
# File 'lib/redis/em-mutex.rb', line 331 def ready? !!@@redis_pool end |
.reconnect_forever? ⇒ Boolean
190 191 192 |
# File 'lib/redis/em-mutex.rb', line 190 def reconnect_forever? @reconnect_max_retries < 0 end |
.reset_autoname ⇒ Object
resets Mutex’s automatic name generator
336 337 338 |
# File 'lib/redis/em-mutex.rb', line 336 def reset_autoname @@name_index = AUTO_NAME_SEED end |
.setup(opts = {}) {|opts| ... } ⇒ Object
Setup redis database and other defaults. MUST BE called once before any semaphore is created.
opts = options Hash:
global options:
-
:connection_pool_class - default is Redis::EM::ConnectionPool
-
:redis_factory - default is proc {|redis_opts| Redis.new redis_opts }
-
:handler - the default value is taken from envronment variable: REDIS_EM_MUTEX_HANDLER or :auto
:pure - optimistic locking commands based (redis-server >= 2.4) :script - server scripting based (redis-server >= 2.6) :auto - autodetect and choose best available handler
-
:expire - sets global Mutex.default_expire
-
:ns - sets global Mutex.namespace
-
:reconnect_max - maximum num. of attempts to re-establish connection to redis server; default is 10; set to 0 to disable re-connecting; set to -1 or :forever to attempt forever
redis connection options:
-
:size - redis connection pool size
passed directly to redis_factory:
-
:url - redis server url
or
-
:scheme - “redis” or “unix”
-
:host - redis host
-
:port - redis port
-
:password - redis password
-
:db - redis database number
-
:path - redis unix-socket path
or
-
:redis - initialized ConnectionPool of Redis clients.
251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 |
# File 'lib/redis/em-mutex.rb', line 251 def setup(opts = {}) stop_watcher @watcher_subscribed = nil opts = OpenStruct.new(opts) yield opts if block_given? = {:driver => :synchrony} redis_updater = proc do |redis| .update({ :scheme => redis.scheme, :host => redis.host, :port => redis.port, :password => redis.password, :db => redis.db, :path => redis.path }.reject {|_k, v| v.nil?}) end if (redis = opts.redis) && !opts.url redis_updater.call redis.client elsif opts.url [:url] = opts.url end redis_updater.call opts pool_size = (opts.size.to_i.nonzero? || 1).abs self.default_expire = opts.expire if opts.expire self.reconnect_max_retries = opts.reconnect_max if opts.reconnect_max @connection_pool_class = opts.connection_pool_class if opts.connection_pool_class.kind_of?(Class) @redis_options = @reconnect_max_retries ||= DEFAULT_RECONNECT_MAX_RETRIES @redis_factory = opts.redis_factory if opts.redis_factory @redis_factory ||= proc {|opts| Redis.new opts } raise TypeError, "redis_factory should respond to [] method" unless @redis_factory.respond_to?(:[]) @@ns = opts.ns if opts.ns unless (@@redis_pool = redis) unless @connection_pool_class begin require 'redis/em-connection-pool' unless defined?(Redis::EM::ConnectionPool) rescue LoadError raise ":connection_pool_class required; could not fall back to Redis::EM::ConnectionPool" end @connection_pool_class = Redis::EM::ConnectionPool end @@redis_pool = @connection_pool_class.new(size: pool_size) do @redis_factory[] end end @redis_watcher = @redis_factory[] start_watcher if ::EM.reactor_running? case handler = opts.handler || @@handler when Module @@handler = handler when nil, Symbol, String setup_handler(handler) else raise TypeError, 'handler must be Symbol or Module' end end |
.setup_handler(handler = nil) ⇒ Object
309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 |
# File 'lib/redis/em-mutex.rb', line 309 def setup_handler(handler = nil) handler = (handler || ENV['REDIS_EM_MUTEX_HANDLER'] || :auto).to_sym.downcase if handler == :auto return unless ::EM.reactor_running? handler = :script begin @@redis_pool.script(:exists) rescue Redis::CommandError handler = :pure end end const_name = "#{handler.to_s.capitalize}HandlerMixin" begin unless self.const_defined?(const_name) require "redis/em-mutex/#{handler}_handler" end @@handler = self.const_get(const_name) rescue LoadError, NameError raise "handler: #{handler} not found" end end |
.sleep(seconds) ⇒ Object
EM sleep helper
405 406 407 408 409 |
# File 'lib/redis/em-mutex.rb', line 405 def sleep(seconds) fiber = Fiber.current ::EM::Timer.new(seconds) { fiber.resume } Fiber.yield end |
.start_watcher ⇒ Object
Initializes the “unlock” channel watcher. It’s called by Mutex.setup internally. Should not be used under normal circumstances. If EventMachine is to be re-started (or after EM.fork_reactor) this method may be used instead of Mutex.setup for “lightweight” startup procedure.
350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 |
# File 'lib/redis/em-mutex.rb', line 350 def start_watcher raise MutexError, "call #{self.class}::setup first" unless @redis_watcher return if watching? if @@watching # Process id changed, we've been forked alive! @redis_watcher = @redis_factory[@redis_options] @@signal_queue.clear end @@watching = $$ retries = 0 Fiber.new do begin @redis_watcher.subscribe(SIGNAL_QUEUE_CHANNEL) do |on| on.subscribe do |channel,| if channel == SIGNAL_QUEUE_CHANNEL @watcher_subscribed = true retries = 0 wakeup_queue_all end end on. do |channel, | if channel == SIGNAL_QUEUE_CHANNEL sig_match = {} Marshal.load().each do |name| sig_match[@@signal_queue[name].first] = true if @@signal_queue.key?(name) end sig_match.keys.each do |sig_proc| sig_proc.call if sig_proc end end end on.unsubscribe do |channel,| @watcher_subscribed = false if channel == SIGNAL_QUEUE_CHANNEL end end break rescue Redis::BaseConnectionError, EventMachine::ConnectionError => e @watcher_subscribed = false warn e. retries+= 1 if retries > reconnect_max_retries && reconnect_max_retries >= 0 @@watching = false else sleep retries > 1 ? 1 : 0.1 end end while watching? end.resume until @watcher_subscribed raise MutexError, "Can not establish watcher channel connection!" unless watching? fiber = Fiber.current ::EM.next_tick { fiber.resume } Fiber.yield end end |
.stop_watcher(force = false) ⇒ Object
Stops the watcher of the “unlock” channel. It should be called before stopping EvenMachine otherwise EM might wait forever for channel connection to be closed.
Raises MutexError if there are still some fibers waiting for lock. Pass ‘true` to forcefully stop it. This might instead cause MutexError to be raised in waiting fibers.
418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 |
# File 'lib/redis/em-mutex.rb', line 418 def stop_watcher(force = false) return unless watching? raise MutexError, "call #{self.class}::setup first" unless @redis_watcher unless @@signal_queue.empty? || force raise MutexError, "can't stop: semaphores in queue" end @@watching = false if @watcher_subscribed @redis_watcher.unsubscribe SIGNAL_QUEUE_CHANNEL while @watcher_subscribed fiber = Fiber.current ::EM.next_tick { fiber.resume } Fiber.yield end end end |
.sweep ⇒ Object
Remove all current Machine/Process locks. Since there is no lock tracking mechanism, it might not be implemented easily. If the need arises then it probably should be implemented.
438 439 440 |
# File 'lib/redis/em-mutex.rb', line 438 def sweep raise NotImplementedError end |
.synchronize(*args, &block) ⇒ Object
Execute block of code protected with named semaphore. Returns result of code block.
Redis::EM::Mutex.synchronize(*names, = {}, &block)
-
*names = lock identifiers - if none they are auto generated
-
options = hash:
-
:name - same as name (in case *names arguments were omitted)
-
:block - block timeout
-
:expire - expire timeout (see: Mutex#lock and Mutex#try_lock)
-
:ns - namespace (otherwise global namespace is used)
If ‘:block` is set and lock isn’t obtained within ‘:block` seconds this method raises MutexTimeout. Raises MutexError if used before Mutex.setup. Raises ArgumentError on invalid options.
478 479 480 |
# File 'lib/redis/em-mutex.rb', line 478 def synchronize(*args, &block) new(*args).synchronize(&block) end |
.wakeup_queue_all ⇒ Object
340 341 342 343 344 |
# File 'lib/redis/em-mutex.rb', line 340 def wakeup_queue_all @@signal_queue.each_value do |queue| queue.each {|h| h.call } end end |
.watching? ⇒ Boolean
Returns true if watcher is connected
186 |
# File 'lib/redis/em-mutex.rb', line 186 def self.watching?; @@watching == $$; end |
Instance Method Details
#can_refresh_expired? ⇒ Boolean
Whether selected implementation handler supports refreshing of already expired locks.
70 71 72 |
# File 'lib/redis/em-mutex.rb', line 70 def can_refresh_expired? @@handler.can_refresh_expired? end |
#expire_timeout ⇒ Object
111 |
# File 'lib/redis/em-mutex.rb', line 111 def expire_timeout; @expire_timeout || @@default_expire; end |
#expire_timeout=(value) ⇒ Object
113 114 115 116 |
# File 'lib/redis/em-mutex.rb', line 113 def expire_timeout=(value) raise ArgumentError, "#{self.class.name}\#expire_timeout value must be greater than 0" unless (value = value.to_f) > 0 @expire_timeout = value end |
#namespace ⇒ Object
Returns the value of attribute ns.
109 110 111 |
# File 'lib/redis/em-mutex.rb', line 109 def ns @ns end |
#sleep(timeout = nil) ⇒ Object
Releases the lock and sleeps ‘timeout` seconds if it is given and non-nil or forever. Raises MutexError if mutex wasn’t locked by the current owner. Raises MutexTimeout if #block_timeout= was set and timeout occured while locking after sleep. If code block is provided it is executed after waking up, just before grabbing a lock.
143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 |
# File 'lib/redis/em-mutex.rb', line 143 def sleep(timeout = nil) raise MutexError, "can't sleep #{self.class} wasn't locked" unless unlock! start = Time.now current = Fiber.current @slept[current] = true if timeout timer = ::EM.add_timer(timeout) do wakeup(current) end Fiber.yield ::EM.cancel_timer timer else Fiber.yield end @slept.delete current yield if block_given? raise MutexTimeout unless lock Time.now - start end |
#synchronize(block_timeout = nil) ⇒ Object
Execute block of code protected with semaphore. Code block receives mutex object. Returns result of code block.
If ‘block_timeout` or Mutex#block_timeout is set and lock isn’t obtained within ‘block_timeout` seconds this method raises MutexTimeout.
170 171 172 173 174 175 176 177 178 179 180 |
# File 'lib/redis/em-mutex.rb', line 170 def synchronize(block_timeout=nil) if lock(block_timeout) begin yield self ensure unlock end else raise MutexTimeout end end |
#unlock ⇒ Object
Releases the lock unconditionally. If the semaphore wasn’t locked by the current owner it is silently ignored. Returns self.
125 126 127 128 |
# File 'lib/redis/em-mutex.rb', line 125 def unlock unlock! self end |
#wakeup(fiber) ⇒ Object Also known as: _wakeup
Wakes up currently sleeping fiber on a mutex.
131 132 133 |
# File 'lib/redis/em-mutex.rb', line 131 def wakeup(fiber) fiber.resume if @slept.delete(fiber) end |
#watching? ⇒ Boolean
Returns true if watcher is connected
183 |
# File 'lib/redis/em-mutex.rb', line 183 def watching?; @@watching == $$; end |