Class: DistributedJob

Inherits:
Object
  • Object
show all
Defined in:
lib/distributed_job.rb,
lib/distributed_job/version.rb

Overview

A distributed job instance allows to keep track of distributed jobs, i.e. jobs which are split into multiple units running in parallel and in multiple workers using redis.

Examples:

Creating a distributed job

distributed_job = DistributedJob.new(redis: Redis.new, token: SecureRandom.hex)

distributed_job.push_each(Date.parse('2021-01-01')..Date.today) do |date, part|
  SomeBackgroundJob.perform_async(date, distributed_job.token, part)
end

distributed_job.token # can be used to query the status of the distributed job

Processing a distributed job part

class SomeBackgroundJob
  def perform(whatever, token, part)
    distributed_job = DistributedJob.new(redis: Redis.new, token: token)

    return if distributed_job.stopped?

    # ...

    distributed_job.done(part)

    if distributed_job.finished?
      # perform e.g. cleanup or the some other job
    end
  rescue
    distributed_job.stop

    raise
  end
end

Constant Summary collapse

VERSION =
'2.0.0'

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(redis:, token:, ttl: 86_400) ⇒ DistributedJob

Initializes a new distributed job.

Examples:

distributed_job = DistributedJob.new(redis: Redis.new, token: SecureRandom.hex)

Parameters:

  • redis (Redis)

    The redis connection instance

  • token (String)

    Some token to be used to identify the job. You can e.g. use SecureRandom.hex to generate one.

  • ttl (Integer) (defaults to: 86_400)

    The number of seconds this job will stay available in redis. This value is used to automatically expire and clean up the job in redis. Default is 86400, i.e. one day. The ttl is used everytime the job is modified in redis.


56
57
58
59
60
# File 'lib/distributed_job.rb', line 56

def initialize(redis:, token:, ttl: 86_400)
  @redis = redis
  @token = token
  @ttl = ttl
end

Instance Attribute Details

#redisObject (readonly)

Returns the value of attribute redis.


41
42
43
# File 'lib/distributed_job.rb', line 41

def redis
  @redis
end

#tokenObject (readonly)

Returns the value of attribute token.


41
42
43
# File 'lib/distributed_job.rb', line 41

def token
  @token
end

#ttlObject (readonly)

Returns the value of attribute ttl.


41
42
43
# File 'lib/distributed_job.rb', line 41

def ttl
  @ttl
end

Instance Method Details

#countObject

Returns the number of pushed parts which are not finished.

Examples:

distributed_job.count # => e.g. 8

158
159
160
# File 'lib/distributed_job.rb', line 158

def count
  redis.scard("#{redis_key}:parts")
end

#done(part) ⇒ Object

Removes the specified part from the distributed job, i.e. from the set of unfinished parts. Use this method when the respective job part has been successfully processed, i.e. finished.

Examples:

class SomeBackgroundJob
  def perform(whatever, token, part)
    distributed_job = DistributedJob.new(redis: Redis.new, token: token)

    # ...

    distributed_job.done(part)
  end
end

Parameters:

  • part (String)

    The job part


129
130
131
132
133
134
135
136
137
138
139
140
141
142
# File 'lib/distributed_job.rb', line 129

def done(part)
  @done_script ||= <<~SCRIPT
    local key, part, ttl = ARGV[1], ARGV[2], tonumber(ARGV[3])

    if redis.call('srem', key .. ':parts', part) == 0 then return end

    redis.call('expire', key .. ':parts', ttl)
    redis.call('expire', key .. ':state', ttl)

    return redis.call('scard', key .. ':parts')
  SCRIPT

  redis.eval(@done_script, argv: [redis_script_key, part.to_s, ttl]) == 0 && closed?
end

#finished?Boolean

Returns true if there are no more unfinished parts.

Examples:

distributed_job.finished? #=> true/false

Returns:

  • (Boolean)

167
168
169
# File 'lib/distributed_job.rb', line 167

def finished?
  closed? && count.zero?
end

#open_partsEnumerator

Returns all parts of the distributed job which are not yet finished.

Returns:

  • (Enumerator)

    The enum which allows to iterate all parts


106
107
108
# File 'lib/distributed_job.rb', line 106

def open_parts
  redis.sscan_each("#{redis_key}:parts")
end

#push_each(enum) {|previous_object, previous_index.to_s| ... } ⇒ Object

Pass an enum to be used to iterate all the units of work of the distributed job. The distributed job needs to know all of them to keep track of the overall number and status of the parts. Passing an enum is much better compared to pushing the parts manually, because the distributed job needs to be closed before the last part of the distributed job is enqueued into some job queue. Otherwise it could potentially happen that the last part is already processed in the job queue before it is pushed to redis, such that the last job doesn't know that the distributed job is finished.

Examples:

distributed_job.push_each(Date.parse('2021-01-01')..Date.today) do |date, part|
  # e.g. SomeBackgroundJob.perform_async(date, distributed_job.token, part)
end

ActiveRecord

distributed_job.push_each(User.select(:id).find_in_batches) do |batch, part|
  # e.g. SomeBackgroundJob.perform_async(batch.first.id, batch.last.id, distributed_job.token, part)
end

Parameters:

  • enum (#each_with_index)

    The enum which can be iterated to get all job parts

Yields:

  • (previous_object, previous_index.to_s)

84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
# File 'lib/distributed_job.rb', line 84

def push_each(enum)
  previous_object = nil
  previous_index = nil

  enum.each_with_index do |current_object, current_index|
    push(current_index)

    yield(previous_object, previous_index.to_s) if previous_index

    previous_object = current_object
    previous_index = current_index
  end

  close

  yield(previous_object, previous_index.to_s) if previous_index
end

#stopObject

Allows to stop a distributed job. This is useful if some error occurred in some part, i.e. background job, of the distributed job and you then want to stop all other not yet finished parts. Please note that only jobs can be stopped which ask the distributed job actively whether or not it was stopped.

Examples:

class SomeBackgroundJob
  def perform(whatever, token, part)
    distributed_job = DistributedJob.new(redis: Redis.new, token: token)

    return if distributed_job.stopped?

    # ...

    distributed_job.done(part)
  rescue
    distributed_job.stop

    raise
  end
end

196
197
198
199
200
201
202
203
204
205
# File 'lib/distributed_job.rb', line 196

def stop
  redis.multi do
    redis.hset("#{redis_key}:state", 'stopped', 1)

    redis.expire("#{redis_key}:state", ttl)
    redis.expire("#{redis_key}:parts", ttl)
  end

  true
end

#stopped?Boolean

Returns true when the distributed job was stopped or false otherwise.

Examples:

class SomeBackgroundJob
  def perform(whatever, token, part)
    distributed_job = DistributedJob.new(redis: Redis.new, token: token)

    return if distributed_job.stopped?

    # ...

    distributed_job.done(part)
  rescue
    distributed_job.stop

    raise
  end
end

Returns:

  • (Boolean)

228
229
230
# File 'lib/distributed_job.rb', line 228

def stopped?
  redis.hget("#{redis_key}:state", 'stopped') == '1'
end

#totalObject

Returns the total number of pushed parts, no matter if finished or not.

Examples:

distributed_job.total # => e.g. 13

149
150
151
# File 'lib/distributed_job.rb', line 149

def total
  redis.hget("#{redis_key}:state", 'total').to_i
end