Class: DistributedJob::Job

Inherits:
Object
  • Object
show all
Defined in:
lib/distributed_job/job.rb

Overview

A ‘DistributedJob::Job` instance allows to keep track of a distributed job, i.e. a job which is split into multiple units running in parallel and in multiple workers using redis.

Examples:

Creating a distributed job

distributed_job = DistributedJobClient.build(token: SecureRandom.hex)

# Add job parts and queue background jobs
distributed_job.push_each(Date.parse('2021-01-01')..Date.today) do |date, part|
  SomeBackgroundJob.perform_async(date, distributed_job.token, part)
end

distributed_job.token # can be used to query the status of the distributed job

Processing a distributed job part

class SomeBackgroundJob
  def perform(whatever, token, part)
    distributed_job = DistributedJobClient.build(token: token)

    return if distributed_job.stopped?

    # ...

    if distributed_job.done(part)
      # perform e.g. cleanup or the some other job
    end
  rescue
    distributed_job.stop

    raise
  end
end

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(client:, token:, ttl: 86_400) ⇒ Job

Initializes a new distributed job.

Examples:

DistributedJobClient = DistributedJob::Client.new(redis: Redis.new)

distributed_job = DistributedJob::Job.new(client: DistributedJobClient, token: SecureRandom.hex)

# However, the preferred way to build a distributed job is:

distributed_job = DistributedJobClient.build(token: SecureRandom.hex)

Parameters:

  • client (DistributedJob::Client)

    The client instance

  • token (String)

    Some token to be used to identify the job. You can e.g. use SecureRandom.hex to generate one.

  • ttl (Integer) (defaults to: 86_400)

    The number of seconds this job will stay available in redis. This value is used to automatically expire and clean up the job in redis. Default is 86400, i.e. one day. The ttl is used everytime the job is modified in redis.



61
62
63
64
65
# File 'lib/distributed_job/job.rb', line 61

def initialize(client:, token:, ttl: 86_400)
  @client = client
  @token = token
  @ttl = ttl
end

Instance Attribute Details

#clientObject (readonly)

Returns the value of attribute client.



40
41
42
# File 'lib/distributed_job/job.rb', line 40

def client
  @client
end

#tokenObject (readonly)

Returns the value of attribute token.



40
41
42
# File 'lib/distributed_job/job.rb', line 40

def token
  @token
end

#ttlObject (readonly)

Returns the value of attribute ttl.



40
41
42
# File 'lib/distributed_job/job.rb', line 40

def ttl
  @ttl
end

Instance Method Details

#countObject

Returns the number of pushed parts which are not finished.

Examples:

distributed_job.count # => e.g. 8


203
204
205
# File 'lib/distributed_job/job.rb', line 203

def count
  redis.scard("#{redis_key}:parts")
end

#done(part) ⇒ Object

Removes the specified part from the distributed job, i.e. from the set of unfinished parts. Use this method when the respective job part has been successfully processed, i.e. finished.

Examples:

class SomeBackgroundJob
  def perform(whatever, token, part)
    distributed_job = DistributedJobClient.build(token: token)

    # ...

    distributed_job.done(part)
  end
end

Parameters:

  • part (String)

    The job part



174
175
176
177
178
179
180
181
182
183
184
185
186
187
# File 'lib/distributed_job/job.rb', line 174

def done(part)
  @done_script ||= <<~SCRIPT
    local key, part, ttl = ARGV[1], ARGV[2], tonumber(ARGV[3])

    if redis.call('srem', key .. ':parts', part) == 0 then return end

    redis.call('expire', key .. ':parts', ttl)
    redis.call('expire', key .. ':state', ttl)

    return redis.call('scard', key .. ':parts')
  SCRIPT

  redis.eval(@done_script, argv: [redis_key, part.to_s, ttl]) == 0 && closed?
end

#finished?Boolean

Returns true if there are no more unfinished parts.

Examples:

distributed_job.finished? #=> true/false

Returns:

  • (Boolean)


212
213
214
# File 'lib/distributed_job/job.rb', line 212

def finished?
  closed? && count.zero?
end

#open_part?(part) ⇒ Boolean

Returns whether or not the part is in the list of open parts of the distributed job.

Returns:

  • (Boolean)

    Returns true or false



151
152
153
# File 'lib/distributed_job/job.rb', line 151

def open_part?(part)
  redis.sismember("#{redis_key}:parts", part.to_s)
end

#open_partsEnumerator

Returns all parts of the distributed job which are not yet finished.

Returns:

  • (Enumerator)

    The enum which allows to iterate all parts



142
143
144
# File 'lib/distributed_job/job.rb', line 142

def open_parts
  redis.sscan_each("#{redis_key}:parts")
end

#push_all(enum) ⇒ Object

Pass an enum to be used to iterate all the units of work of the distributed job. The values of the enum are used for the names of the parts, such that values listed multiple times (duplicates) will only be added once to the distributed job. The distributed job needs to know all of them to keep track of the overall number and status of the parts. Passing an enum is much better compared to pushing the parts manually, because the distributed job needs to be closed before the last part of the distributed job is enqueued into some job queue. Otherwise it could potentially happen that the last part is already processed in the job queue before it is pushed to redis, such that the last job doesn’t know that the distributed job is finished.

Examples:

distributed_job.push_all(0..128)
distributed_job.push(['part1', 'part2', 'part3'])

Parameters:

  • enum (#each)

    The enum which can be iterated to get all job parts

Raises:



128
129
130
131
132
133
134
135
136
# File 'lib/distributed_job/job.rb', line 128

def push_all(enum)
  raise(AlreadyClosed, 'The distributed job is already closed') if closed?

  enum.each do |part|
    push(part)
  end

  close
end

#push_each(enum) {|previous_object, previous_index.to_s| ... } ⇒ Object

Pass an enum to be used to iterate all the units of work of the distributed job. The distributed job needs to know all of them to keep track of the overall number and status of the parts. Passing an enum is much better compared to pushing the parts manually, because the distributed job needs to be closed before the last part of the distributed job is enqueued into some job queue. Otherwise it could potentially happen that the last part is already processed in the job queue before it is pushed to redis, such that the last job doesn’t know that the distributed job is finished.

Examples:

distributed_job.push_each(Date.parse('2021-01-01')..Date.today) do |date, part|
  # e.g. SomeBackgroundJob.perform_async(date, distributed_job.token, part)
end

ActiveRecord

distributed_job.push_each(User.select(:id).find_in_batches) do |batch, part|
  # e.g. SomeBackgroundJob.perform_async(batch.first.id, batch.last.id, distributed_job.token, part)
end

Parameters:

  • enum (#each_with_index)

    The enum which can be iterated to get all job parts

Yields:

  • (previous_object, previous_index.to_s)

Raises:



89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
# File 'lib/distributed_job/job.rb', line 89

def push_each(enum)
  raise(AlreadyClosed, 'The distributed job is already closed') if closed?

  previous_object = nil
  previous_index = nil

  enum.each_with_index do |current_object, current_index|
    push(current_index)

    yield(previous_object, previous_index.to_s) if previous_index

    previous_object = current_object
    previous_index = current_index
  end

  close

  yield(previous_object, previous_index.to_s) if previous_index
end

#stopObject

Allows to stop a distributed job. This is useful if some error occurred in some part, i.e. background job, of the distributed job and you then want to stop all other not yet finished parts. Please note that only jobs can be stopped which ask the distributed job actively whether or not it was stopped.

Examples:

class SomeBackgroundJob
  def perform(whatever, token, part)
    distributed_job = DistributedJobClient.build(token: token)

    return if distributed_job.stopped?

    # ...

    distributed_job.done(part)
  rescue
    distributed_job.stop

    raise
  end
end


241
242
243
244
245
246
247
248
249
250
# File 'lib/distributed_job/job.rb', line 241

def stop
  redis.multi do |transaction|
    transaction.hset("#{redis_key}:state", 'stopped', 1)

    transaction.expire("#{redis_key}:state", ttl)
    transaction.expire("#{redis_key}:parts", ttl)
  end

  true
end

#stopped?Boolean

Returns true when the distributed job was stopped or false otherwise.

Examples:

class SomeBackgroundJob
  def perform(whatever, token, part)
    distributed_job = DistributedJobClient.build(token: token)

    return if distributed_job.stopped?

    # ...

    distributed_job.done(part)
  rescue
    distributed_job.stop

    raise
  end
end

Returns:

  • (Boolean)


273
274
275
# File 'lib/distributed_job/job.rb', line 273

def stopped?
  redis.hget("#{redis_key}:state", 'stopped') == '1'
end

#totalObject

Returns the total number of pushed parts, no matter if finished or not.

Examples:

distributed_job.total # => e.g. 13


194
195
196
# File 'lib/distributed_job/job.rb', line 194

def total
  redis.hget("#{redis_key}:state", 'total').to_i
end