Module: Resque::Plugins::UniqueAtRuntime::ClassMethods

Defined in:
lib/resque/plugins/unique_at_runtime.rb

Instance Method Summary collapse

Instance Method Details

#around_perform_unlock_runtime(*args) ⇒ Object



98
99
100
101
102
# File 'lib/resque/plugins/unique_at_runtime.rb', line 98

def around_perform_unlock_runtime(*args)
  yield
ensure
  unlock_queue(*args)
end

#before_perform_lock_runtime(*args) ⇒ Object



80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
# File 'lib/resque/plugins/unique_at_runtime.rb', line 80

def before_perform_lock_runtime(*args)
  if (key = queue_locked?(*args))
    Resque::UniqueAtRuntime.debug("failed to lock queue with #{key}")

    # Sleep so the CPU's rest
    sleep(runtime_requeue_interval)

    # can't get the lock, so re-enqueue the task
    reenqueue(*args)

    # and don't perform
    raise Resque::Job::DontPerform
  else
    Resque::UniqueAtRuntime.debug('check passed will perform')
    true
  end
end

#can_lock_queue?(*args) ⇒ Boolean

returns true if the job signature can be locked (is not currently locked)

Returns:

  • (Boolean)


50
51
52
# File 'lib/resque/plugins/unique_at_runtime.rb', line 50

def can_lock_queue?(*args)
  !queue_locked?(*args)
end

#on_failure_unlock_runtime(*args) ⇒ Object

There may be scenarios where the around_perform’s ensure unlock±

duplicates the on_failure unlock, but that's a small price to pay for
uniqueness.


107
108
109
110
# File 'lib/resque/plugins/unique_at_runtime.rb', line 107

def on_failure_unlock_runtime(*args)
  Resque::UniqueAtRuntime.debug('on failure unlock')
  unlock_queue(*args)
end

#queue_locked?(*args) ⇒ Boolean

returns the locking key if locked, otherwise false

Returns:

  • (Boolean)


55
56
57
58
59
60
61
62
63
64
65
66
67
68
# File 'lib/resque/plugins/unique_at_runtime.rb', line 55

def queue_locked?(*args)
  now = Time.now.to_i
  key = unique_at_runtime_redis_key(*args)
  timeout = runtime_lock_timeout_at(now)

  Resque::UniqueAtRuntime.debug("attempting to lock queue with #{key}")

  # Per http://redis.io/commands/setnx
  return false if Resque.redis.setnx(key, timeout)
  return key if Resque.redis.get(key).to_i > now
  return false if Resque.redis.getset(key, timeout).to_i <= now

  key
end

#reenqueue(*args) ⇒ Object



76
77
78
# File 'lib/resque/plugins/unique_at_runtime.rb', line 76

def reenqueue(*args)
  Resque.enqueue(self, *args)
end

#runtime_lock_timeoutObject



27
28
29
30
# File 'lib/resque/plugins/unique_at_runtime.rb', line 27

def runtime_lock_timeout
  instance_variable_get(:@runtime_lock_timeout) ||
      instance_variable_set(:@runtime_lock_timeout, Resque::UniqueAtRuntime.configuration&.lock_timeout)
end

#runtime_lock_timeout_at(now) ⇒ Object



23
24
25
# File 'lib/resque/plugins/unique_at_runtime.rb', line 23

def runtime_lock_timeout_at(now)
  now + runtime_lock_timeout + 1
end

#runtime_requeue_intervalObject



32
33
34
35
# File 'lib/resque/plugins/unique_at_runtime.rb', line 32

def runtime_requeue_interval
  instance_variable_get(:@runtime_requeue_interval) ||
      instance_variable_set(:@runtime_requeue_interval, Resque::UniqueAtRuntime.configuration&.requeue_interval)
end

#unique_at_runtime_key_baseObject



37
38
39
40
# File 'lib/resque/plugins/unique_at_runtime.rb', line 37

def unique_at_runtime_key_base
  instance_variable_get(:@unique_at_runtime_key_base) ||
      instance_variable_set(:@unique_at_runtime_key_base, Resque::UniqueAtRuntime.configuration&.unique_at_runtime_key_base)
end

#unique_at_runtime_redis_key(*_) ⇒ Object

Overwrite this method to uniquely identify which mutex should be used for a resque worker.



44
45
46
47
# File 'lib/resque/plugins/unique_at_runtime.rb', line 44

def unique_at_runtime_redis_key(*_)
  Resque::UniqueAtRuntime.debug("getting key for #{@queue}!")
  "#{unique_at_runtime_key_base}:#{@queue}"
end

#unlock_queue(*args) ⇒ Object



70
71
72
73
74
# File 'lib/resque/plugins/unique_at_runtime.rb', line 70

def unlock_queue(*args)
  key = unique_at_runtime_redis_key(*args)
  Resque::UniqueAtRuntime.debug("unlock queue with #{key}")
  Resque.redis.del(key)
end