Module: Wayfarer::BatchCompletion

Defined in:
lib/wayfarer/batch_completion.rb

Overview

BatchCompletion tracks the completion of a batch of jobs. It does so by incrementing and decrementing a counter in Redis.

The counter is incremented when a job is first enqueued and decremented when a job is performed. If a job is retried, the counter is not incremented. When a job succeeds or fails and thereby exceeds its retry count, the counter is decremented.

When the counter reaches zero, garbage collection deletes the Redis keys associated with the batch.

Class Method Summary collapse

Class Method Details

.call(name, _, _, _, data) ⇒ Object



23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
# File 'lib/wayfarer/batch_completion.rb', line 23

def call(name, _, _, _, data)
  return unless (job = data[:job]).is_a?(Wayfarer::Base)

  task = job.arguments.first

  # In the case of `enqueue.active_job` middleware hasn't executed yet
  task[:redis_pool] ||= Wayfarer::Redis::Pool.instance # TODO: Test

  counter = Redis::Counter.new(task) do
    job.run_callbacks(:batch)
  ensure
    Wayfarer::GC.run(task)
  end

  handle(name, job, task, counter)
end

.handle(name, job, task, counter) ⇒ Object



40
41
42
43
44
45
46
# File 'lib/wayfarer/batch_completion.rb', line 40

def handle(name, job, task, counter)
  case name
  when "enqueue.active_job" then counter.increment unless retry?(job)
  when "perform.active_job" then counter.decrement if succeeded?(job, task)
  when "retry_stopped.active_job" then counter.decrement
  end
end

.retry?(job) ⇒ Boolean

Returns:

  • (Boolean)


48
49
50
# File 'lib/wayfarer/batch_completion.rb', line 48

def retry?(job)
  job.executions > 0
end

.subscribe!Object



17
18
19
20
21
# File 'lib/wayfarer/batch_completion.rb', line 17

def subscribe!
  ActiveSupport::Notifications.subscribe("enqueue.active_job", self)
  ActiveSupport::Notifications.subscribe("perform.active_job", self)
  ActiveSupport::Notifications.subscribe("retry_stopped.active_job", self)
end

.succeeded?(job, task) ⇒ Boolean

Returns:

  • (Boolean)


52
53
54
# File 'lib/wayfarer/batch_completion.rb', line 52

def succeeded?(job, task)
  job.exception_executions == task[:initial_exception_executions]
end