Module: Redis::EM::Mutex::PureHandlerMixin

Includes:
Errors
Defined in:
lib/redis/em-mutex/pure_handler.rb

Class Method Summary collapse

Instance Method Summary collapse

Class Method Details

.can_refresh_expired?Boolean

Returns:

  • (Boolean)


7
# File 'lib/redis/em-mutex/pure_handler.rb', line 7

def self.can_refresh_expired?; true end

Instance Method Details

#expiration_timestampObject

Returns timestamp at which the semaphore will expire or have expired. Returns ‘nil` if the semaphore wasn’t locked by current owner.

The check is performed only on the Mutex object instance and should only be used as a hint. For reliable lock status information use #refresh or #owned? instead.



81
82
83
# File 'lib/redis/em-mutex/pure_handler.rb', line 81

def expiration_timestamp
  @locked_id.to_f if @locked_id && owner_ident(@locked_id) == @locked_owner_id
end

#expired?Boolean

Returns ‘true` when the semaphore is being held and have already expired. Returns `false` when the semaphore is still locked and valid or `nil` if the semaphore wasn’t locked by current owner.

The check is performed only on the Mutex object instance and should only be used as a hint. For reliable lock status information use #refresh or #owned? instead.

Returns:

  • (Boolean)


52
53
54
# File 'lib/redis/em-mutex/pure_handler.rb', line 52

def expired?
  Time.now.to_f > @locked_id.to_f if @locked_id && owner_ident(@locked_id) == @locked_owner_id
end

#expires_atObject

Returns local time at which the semaphore will expire or have expired. Returns ‘nil` if the semaphore wasn’t locked by current owner.

The check is performed only on the Mutex object instance and should only be used as a hint. For reliable lock status information use #refresh or #owned? instead.



72
73
74
# File 'lib/redis/em-mutex/pure_handler.rb', line 72

def expires_at
  Time.at(@locked_id.to_f) if @locked_id && owner_ident(@locked_id) == @locked_owner_id
end

#expires_inObject

Returns the number of seconds left until the semaphore expires. The number of seconds less than 0 means that the semaphore expired and could be grabbed by some other owner. Returns ‘nil` if the semaphore wasn’t locked by current owner.

The check is performed only on the Mutex object instance and should only be used as a hint. For reliable lock status information use #refresh or #owned? instead.



63
64
65
# File 'lib/redis/em-mutex/pure_handler.rb', line 63

def expires_in
  @locked_id.to_f - Time.now.to_f if @locked_id && owner_ident(@locked_id) == @locked_owner_id
end

#lock(block_timeout = nil) ⇒ Object

Attempts to grab the lock and waits if it isn’t available. Raises MutexError if mutex was locked by the current owner. Returns ‘true` if lock was successfully obtained. Returns `false` if lock wasn’t available within ‘block_timeout` seconds.

If ‘block_timeout` is `nil` or omited this method uses Mutex#block_timeout. If also Mutex#block_timeout is nil this method returns only after lock has been granted.

Use Mutex#expire_timeout= to set lock expiration timeout. Otherwise global Mutex.default_expire is used.



167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
# File 'lib/redis/em-mutex/pure_handler.rb', line 167

def lock(block_timeout = nil)
  block_timeout||= self.block_timeout
  names = @ns_names
  timer = fiber = nil
  try_again = false
  sig_proc = proc do
    try_again = true
    ::EM.next_tick { fiber.resume if fiber } if fiber
  end
  begin
    Mutex.start_watcher unless watching?
    queues = names.map {|n| signal_queue[n] << sig_proc }
    ident_match = owner_ident
    until try_lock
      start_time = Time.now.to_f
      expire_time = nil
      redis_pool.watch(*names) do |r|
        expired_names = names.zip(r.mget(*names)).map do |name, lock_value|
          if lock_value
            owner, exp_id = lock_value.split ' '
            exp_time = exp_id.to_f
            expire_time = exp_time if expire_time.nil? || exp_time < expire_time
            if exp_time < start_time
              name
            elsif owner == ident_match
              raise MutexError, "deadlock; recursive locking #{owner}"
            end
          end
        end
        if expire_time && expire_time < start_time
          r.multi do |multi|
            expired_names = expired_names.compact
            multi.del(*expired_names)
          end
        else
          r.unwatch
        end
      end
      timeout = (expire_time = expire_time.to_f) - start_time
      timeout = block_timeout if block_timeout && block_timeout < timeout

      if !try_again && timeout > 0
        timer = ::EM::Timer.new(timeout) do
          timer = nil
          ::EM.next_tick { fiber.resume if fiber } if fiber
        end
        fiber = Fiber.current
        Fiber.yield
        fiber = nil
      end
      finish_time = Time.now.to_f
      if try_again || finish_time > expire_time
        block_timeout-= finish_time - start_time if block_timeout
        try_again = false
      else
        return false
      end
    end
    true
  ensure
    timer.cancel if timer
    timer = nil
    queues.each {|q| q.delete sig_proc }
    names.each {|n| signal_queue.delete(n) if signal_queue[n].empty? }
  end
end

#locked?Boolean

Returns ‘true` if this semaphore (at least one of locked `names`) is currently being held by some owner.

Returns:

  • (Boolean)


28
29
30
31
32
33
34
35
36
# File 'lib/redis/em-mutex/pure_handler.rb', line 28

def locked?
  if @multi
    redis_pool.multi do |multi|
      @ns_names.each {|n| multi.exists n}
    end.any?
  else
    redis_pool.exists @ns_names.first
  end
end

#owned?Boolean

Returns ‘true` if this semaphore (all the locked `names`) is currently being held by calling owner. This is the method you should use to check if lock is still held and valid.

Returns:

  • (Boolean)


40
41
42
43
44
# File 'lib/redis/em-mutex/pure_handler.rb', line 40

def owned?
  !!if @locked_id && owner_ident(@locked_id) == (lock_full_ident = @locked_owner_id)
    redis_pool.mget(*@ns_names).all? {|v| v == lock_full_ident}
  end
end

#owner_ident(lock_id = nil) ⇒ Object



234
235
236
237
238
239
240
# File 'lib/redis/em-mutex/pure_handler.rb', line 234

def owner_ident(lock_id = nil)
  if lock_id
    "#{uuid}$#$$@#{Fiber.current.__id__} #{lock_id}"
  else
    "#{uuid}$#$$@#{Fiber.current.__id__}"
  end
end

#refresh(expire_timeout = nil) ⇒ Object

Refreshes lock expiration timeout. Returns ‘true` if refresh was successfull. Returns `false` if the semaphore wasn’t locked or when it was locked but it has expired and now it’s got a new owner.



113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
# File 'lib/redis/em-mutex/pure_handler.rb', line 113

def refresh(expire_timeout=nil)
  ret = false
  if @locked_id && owner_ident(@locked_id) == (lock_full_ident = @locked_owner_id)
    new_lock_id = (Time.now + (expire_timeout.to_f.nonzero? || self.expire_timeout)).to_f.to_s
    new_lock_full_ident = owner_ident(new_lock_id)
    redis_pool.watch(*@ns_names) do |r|
      if r.mget(*@ns_names).all? {|v| v == lock_full_ident}
        if r.multi {|m| m.mset(*@ns_names.map {|k| [k, new_lock_full_ident]}.flatten)}
          @locked_id = new_lock_id
          @locked_owner_id = new_lock_full_ident
          ret = true
        end
      else
        r.unwatch
      end
    end
  end
  ret
end

#try_lockObject

This method is for internal use only.

Attempts to obtain the lock and returns immediately. Returns ‘true` if the lock was granted. Use Mutex#expire_timeout= to set lock expiration time in secods. Otherwise global Mutex.default_expire is used.

This method doesn’t capture expired semaphores in “pure” implementation and therefore it should NEVER be used under normal circumstances. Use Mutex#lock with block_timeout = 0 to obtain expired lock without blocking.



95
96
97
98
99
100
101
102
103
104
105
106
107
# File 'lib/redis/em-mutex/pure_handler.rb', line 95

def try_lock
  lock_id = (Time.now + expire_timeout).to_f.to_s
  lock_full_ident = owner_ident(lock_id)
  !!if @multi
    if redis_pool.msetnx(*@ns_names.map {|k| [k, lock_full_ident]}.flatten)
      @locked_id = lock_id
      @locked_owner_id = lock_full_ident
    end
  elsif redis_pool.setnx(@ns_names.first, lock_full_ident)
    @locked_id = lock_id
    @locked_owner_id = lock_full_ident
  end
end

#unlock!Object

Releases the lock. Returns self on success. Returns ‘false` if the semaphore wasn’t locked or when it was locked but it has expired and now it’s got a new owner. In case of unlocking multiple name semaphore this method returns self only when all of the names have been unlocked successfully.



138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
# File 'lib/redis/em-mutex/pure_handler.rb', line 138

def unlock!
  ret = false
  if (locked_id = @locked_id) && owner_ident(@locked_id) == (lock_full_ident = @locked_owner_id)
    @locked_owner_id = @locked_id = nil
    redis_pool.watch(*@ns_names) do |r|
      if r.mget(*@ns_names).all? {|v| v == lock_full_ident}
        ret = !!r.multi do |multi|
          multi.del(*@ns_names)
          multi.publish SIGNAL_QUEUE_CHANNEL, @marsh_names if Time.now.to_f < locked_id.to_f
        end
      else
        r.unwatch
      end
    end
  end
  ret && self
end