Module: Wayfarer::BatchCompletion
- Defined in:
- lib/wayfarer/batch_completion.rb
Overview
BatchCompletion tracks the completion of a batch of jobs. It does so by incrementing and decrementing a counter in Redis.
The counter is incremented when a job is first enqueued and decremented when a job is performed. If a job is retried, the counter is not incremented. When a job succeeds or fails and thereby exceeds its retry count, the counter is decremented.
When the counter reaches zero, garbage collection deletes the Redis keys associated with the batch.
Class Method Summary collapse
- .call(name, _, _, _, data) ⇒ Object
- .handle(name, job, task, counter) ⇒ Object
- .retry?(job) ⇒ Boolean
- .subscribe! ⇒ Object
- .succeeded?(job, task) ⇒ Boolean
Class Method Details
.call(name, _, _, _, data) ⇒ Object
23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 |
# File 'lib/wayfarer/batch_completion.rb', line 23 def call(name, _, _, _, data) return unless (job = data[:job]).is_a?(Wayfarer::Base) task = job.arguments.first # In the case of `enqueue.active_job` middleware hasn't executed yet task[:redis_pool] ||= Wayfarer::Redis::Pool.instance # TODO: Test counter = Redis::Counter.new(task) do job.run_callbacks(:batch) ensure Wayfarer::GC.run(task) end handle(name, job, task, counter) end |
.handle(name, job, task, counter) ⇒ Object
40 41 42 43 44 45 46 |
# File 'lib/wayfarer/batch_completion.rb', line 40 def handle(name, job, task, counter) case name when "enqueue.active_job" then counter.increment unless retry?(job) when "perform.active_job" then counter.decrement if succeeded?(job, task) when "retry_stopped.active_job" then counter.decrement end end |
.retry?(job) ⇒ Boolean
48 49 50 |
# File 'lib/wayfarer/batch_completion.rb', line 48 def retry?(job) job.executions > 0 end |
.subscribe! ⇒ Object
17 18 19 20 21 |
# File 'lib/wayfarer/batch_completion.rb', line 17 def subscribe! ActiveSupport::Notifications.subscribe("enqueue.active_job", self) ActiveSupport::Notifications.subscribe("perform.active_job", self) ActiveSupport::Notifications.subscribe("retry_stopped.active_job", self) end |
.succeeded?(job, task) ⇒ Boolean
52 53 54 |
# File 'lib/wayfarer/batch_completion.rb', line 52 def succeeded?(job, task) job.exception_executions == task[:initial_exception_executions] end |