Module: Resque::Integration::Unique
- Defined in:
- lib/resque/integration/unique.rb
Overview
Unique job
Defined Under Namespace
Modules: Overrides
Constant Summary collapse
- LOCK_TIMEOUT =
3 days
259_200
Class Method Summary collapse
Instance Method Summary collapse
-
#after_dequeue_lock(_meta_id, *args) ⇒ Object
When job is dequeued we should remove lock.
-
#after_dequeue_meta(*args) ⇒ Object
Fail metadata if dequeue succeed.
- #around_perform_lock(_meta_id, *args) ⇒ Object
-
#before_dequeue_lock(*args) ⇒ Object
Before dequeue check if job is running.
-
#before_enqueue_lock(_meta_id, *args) ⇒ Object
Before enqueue acquire a lock.
-
#dequeue(*args) ⇒ Object
Dequeue unique job.
- #enqueue_to(queue, *args) ⇒ Object
-
#enqueued?(*args) ⇒ Boolean
Is job already in queue or in process?.
- #execute ⇒ Object
-
#lock_id(*args) ⇒ Object
private
LockID should be independent from MetaID.
-
#lock_on(&block) ⇒ Object
Get or set proc returning unique arguments.
- #lock_timeout ⇒ Object
-
#locked?(*args) ⇒ Boolean
Returns true if resque job is in locked state.
-
#meta ⇒ Object
get meta object associated with job.
-
#on_failure_lock(_e, _meta_id, *args) ⇒ Object
When job is failed we should remove lock.
- #on_failure_retry(exception, *args) ⇒ Object
-
#perform(meta_id, *args) ⇒ Object
default ‘perform` method override.
-
#retry_args(meta_id, *args) ⇒ Object
Метод вызывает resque-retry когда ставить отложенное задание здесь мы убираем meta_id из аргументов.
-
#retry_identifier(*args) ⇒ Object
Метод вызывает resque-retry, когда записывает/читает число перезапусков - во время работы воркера первым аргументом передается meta_id; - во время чтения из вебинтерфейса, meta_id не передается, т.к.
-
#scheduled(queue, klass, *args) ⇒ Object
Метод вызывает resque-scheduler чтобы поставить задание в текущую очередь.
-
#unique? ⇒ Boolean
Returns true because job is unique now.
Class Method Details
.extended(base) ⇒ Object
26 27 28 29 30 31 32 33 |
# File 'lib/resque/integration/unique.rb', line 26 def self.extended(base) if base.singleton_class.include?(::Resque::Integration::Priority) raise 'Uniqueness should be enabled before Prioritness' end base.extend(::Resque::Plugins::Progress) base.singleton_class.prepend(Overrides) end |
Instance Method Details
#after_dequeue_lock(_meta_id, *args) ⇒ Object
When job is dequeued we should remove lock
146 147 148 |
# File 'lib/resque/integration/unique.rb', line 146 def after_dequeue_lock(, *args) unlock(*args) end |
#after_dequeue_meta(*args) ⇒ Object
Fail metadata if dequeue succeed
151 152 153 154 155 |
# File 'lib/resque/integration/unique.rb', line 151 def (*args) if ( = args.first) && ( = ()) .fail! end end |
#around_perform_lock(_meta_id, *args) ⇒ Object
138 139 140 141 142 143 |
# File 'lib/resque/integration/unique.rb', line 138 def around_perform_lock(, *args) yield ensure # Always clear the lock when we're done, even if there is an error. unlock(*args) end |
#before_dequeue_lock(*args) ⇒ Object
Before dequeue check if job is running
116 117 118 119 120 |
# File 'lib/resque/integration/unique.rb', line 116 def before_dequeue_lock(*args) ( = args.first) && ( = ()) && !.working? end |
#before_enqueue_lock(_meta_id, *args) ⇒ Object
Before enqueue acquire a lock
Returns boolean
134 135 136 |
# File 'lib/resque/integration/unique.rb', line 134 def before_enqueue_lock(, *args) ::Resque.redis.set(lock_id(*args), 1, ex: lock_timeout, nx: true) end |
#dequeue(*args) ⇒ Object
Dequeue unique job
177 178 179 |
# File 'lib/resque/integration/unique.rb', line 177 def dequeue(*args) ::Resque.dequeue(self, (*args), *args) end |
#enqueue_to(queue, *args) ⇒ Object
181 182 183 184 185 186 187 188 189 190 |
# File 'lib/resque/integration/unique.rb', line 181 def enqueue_to(queue, *args) = enqueued?(*args) return if .present? = ::Resque::Plugins::Meta::Metadata.new('meta_id' => (args), 'job_class' => to_s) .save ::Resque.enqueue_to(queue, self, ., *args) end |
#enqueued?(*args) ⇒ Boolean
Is job already in queue or in process?
158 159 160 161 |
# File 'lib/resque/integration/unique.rb', line 158 def enqueued?(*args) # if lock exists and timeout not exceeded ((*args)) if locked?(*args) end |
#execute ⇒ Object
106 107 108 |
# File 'lib/resque/integration/unique.rb', line 106 def execute(*) raise NotImplementedError, "You should implement `execute' method" end |
#lock_id(*args) ⇒ Object
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
LockID should be independent from MetaID
89 90 91 92 93 94 |
# File 'lib/resque/integration/unique.rb', line 89 def lock_id(*args) args = args.map { |i| i.is_a?(Hash) ? i.with_indifferent_access : i } locked_args = lock_on.call(*args) encoded_args = ::Digest::SHA1.hexdigest(obj_to_string(locked_args)) "lock:#{name}-#{encoded_args}" end |
#lock_on(&block) ⇒ Object
Get or set proc returning unique arguments
79 80 81 82 83 84 85 |
# File 'lib/resque/integration/unique.rb', line 79 def lock_on(&block) if block_given? @unique = block else @unique ||= proc { |*args| args } end end |
#lock_timeout ⇒ Object
163 164 165 |
# File 'lib/resque/integration/unique.rb', line 163 def lock_timeout LOCK_TIMEOUT end |
#locked?(*args) ⇒ Boolean
Returns true if resque job is in locked state
168 169 170 171 172 173 174 |
# File 'lib/resque/integration/unique.rb', line 168 def locked?(*args) if Gem::Version.new(Redis::VERSION) < Gem::Version.new('4.3') ::Resque.redis.exists(lock_id(*args)) else ::Resque.redis.exists?(lock_id(*args)) end end |
#meta ⇒ Object
get meta object associated with job
97 98 99 |
# File 'lib/resque/integration/unique.rb', line 97 def (@meta_id) end |
#on_failure_lock(_e, _meta_id, *args) ⇒ Object
When job is failed we should remove lock
111 112 113 |
# File 'lib/resque/integration/unique.rb', line 111 def on_failure_lock(_e, , *args) unlock(*args) end |
#on_failure_retry(exception, *args) ⇒ Object
122 123 124 125 126 127 128 129 |
# File 'lib/resque/integration/unique.rb', line 122 def on_failure_retry(exception, *args) return unless defined?(super) # Keep meta_id if kill -9 (or ABRT) @meta_id = args.first if exception.is_a?(::Resque::DirtyExit) super end |
#perform(meta_id, *args) ⇒ Object
default ‘perform` method override
102 103 104 |
# File 'lib/resque/integration/unique.rb', line 102 def perform(, *args) execute(*args) end |
#retry_args(meta_id, *args) ⇒ Object
Метод вызывает resque-retry когда ставить отложенное задание здесь мы убираем meta_id из аргументов
63 64 65 |
# File 'lib/resque/integration/unique.rb', line 63 def retry_args(, *args) args end |
#retry_identifier(*args) ⇒ Object
Метод вызывает resque-retry, когда записывает/читает число перезапусков
- во время работы воркера первым аргументом передается meta_id;
- во время чтения из вебинтерфейса, meta_id не передается, т.к. она выкидывается во время перепостановки
джоба(см retry_args);
- если метод вызывается в пользовательском коде(и @meta_id отсутствует), то meta_id нельзя передавать.
72 73 74 75 76 |
# File 'lib/resque/integration/unique.rb', line 72 def retry_identifier(*args) return if args.empty? args.shift if @meta_id.is_a?(String) && !@meta_id.empty? && @meta_id == args.first lock_id(*args) end |
#scheduled(queue, klass, *args) ⇒ Object
Метод вызывает resque-scheduler чтобы поставить задание в текущую очередь
57 58 59 |
# File 'lib/resque/integration/unique.rb', line 57 def scheduled(queue, klass, *args) klass.constantize.enqueue_to(queue, *args) end |
#unique? ⇒ Boolean
Returns true because job is unique now
52 53 54 |
# File 'lib/resque/integration/unique.rb', line 52 def unique? true end |