Class: Cloudtasker::Batch::Job

Inherits:
Object
  • Object
show all
Defined in:
lib/cloudtasker/batch/job.rb

Overview

Handle batch management

Constant Summary collapse

JOBS_NAMESPACE =

Key Namespace used for object saved under this class

'jobs'
STATES_NAMESPACE =
'states'
COMPLETION_STATUSES =

List of sub-job statuses taken into account when evaluating if the batch is complete.

Batch jobs go through the following states:

  • scheduled: the parent batch has enqueued a worker for the child job

  • processing: the child job is running

  • completed: the child job has completed successfully

  • errored: the child job has encountered an error and must retry

  • dead: the child job has exceeded its max number of retries

The ‘dead’ status is considered to be a completion status as it means that the job will never succeed. There is no point in blocking the batch forever so we proceed forward eventually.

%w[completed dead].freeze
IGNORED_ERRORED_CALLBACKS =

These callbacks do not need to raise errors on their own because the jobs will be either retried or dropped

%i[on_child_error on_child_dead].freeze
BATCH_MAX_LOCK_WAIT =

The maximum number of seconds to wait for a batch state lock to be acquired.

60

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(worker) ⇒ Job

Build a new instance of the class.

Parameters:



102
103
104
# File 'lib/cloudtasker/batch/job.rb', line 102

def initialize(worker)
  @worker = worker
end

Instance Attribute Details

#workerObject (readonly)

Returns the value of attribute worker.



7
8
9
# File 'lib/cloudtasker/batch/job.rb', line 7

def worker
  @worker
end

Class Method Details

.find(worker_id) ⇒ Cloudtasker::Batch::Job?

Find a batch by id.

Parameters:

  • batch_id (String)

    The batch id.

Returns:



53
54
55
56
57
58
59
60
61
62
63
# File 'lib/cloudtasker/batch/job.rb', line 53

def self.find(worker_id)
  return nil unless worker_id

  # Retrieve related worker
  payload = redis.fetch(key("#{JOBS_NAMESPACE}/#{worker_id}"))
  worker = Cloudtasker::Worker.from_hash(payload)
  return nil unless worker

  # Build batch job
  self.for(worker)
end

.for(worker) ⇒ Cloudtasker::Batch::Job

Attach a batch to a worker

Parameters:

Returns:



85
86
87
88
89
90
91
92
93
94
95
# File 'lib/cloudtasker/batch/job.rb', line 85

def self.for(worker)
  # Load extension if not loaded already on the worker class
  worker.class.include(Extension::Worker) unless worker.class <= Extension::Worker

  # Add batch and parent batch to worker
  worker.batch = new(worker)
  worker.parent_batch = worker.batch.parent_batch

  # Return the batch
  worker.batch
end

.key(val) ⇒ String

Return a namespaced key.

Parameters:

  • val (String, Symbol)

    The key to namespace

Returns:

  • (String)

    The namespaced key.



72
73
74
75
76
# File 'lib/cloudtasker/batch/job.rb', line 72

def self.key(val)
  return nil if val.nil?

  [to_s.underscore, val.to_s].join('/')
end

.redisCloudtasker::RedisClient

Return the cloudtasker redis client

Returns:



42
43
44
# File 'lib/cloudtasker/batch/job.rb', line 42

def self.redis
  @redis ||= RedisClient.new
end

Instance Method Details

#==(other) ⇒ Boolean

Equality operator.

Parameters:

  • other (Any)

    The object to compare.

Returns:

  • (Boolean)

    True if the object is equal.



132
133
134
# File 'lib/cloudtasker/batch/job.rb', line 132

def ==(other)
  other.is_a?(self.class) && other.batch_id == batch_id
end

#add(worker_klass, *args) ⇒ Array<Cloudtasker::Worker>

Add a worker to the batch

Parameters:

  • worker_klass (Class)

    The worker class.

  • *args (Array<any>)

    The worker arguments.

Returns:



213
214
215
# File 'lib/cloudtasker/batch/job.rb', line 213

def add(worker_klass, *args)
  add_to_queue(worker.job_queue, worker_klass, *args)
end

#add_to_queue(queue, worker_klass, *args) ⇒ Array<Cloudtasker::Worker>

Add a worker to the batch using a specific queue.

Parameters:

  • queue (String, Symbol)

    The name of the queue

  • worker_klass (Class)

    The worker class.

  • *args (Array<any>)

    The worker arguments.

Returns:



226
227
228
229
230
231
232
# File 'lib/cloudtasker/batch/job.rb', line 226

def add_to_queue(queue, worker_klass, *args)
  jobs << worker_klass.new(
    job_args: args,
    job_meta: { key(:parent_id) => batch_id },
    job_queue: queue
  )
end

#batch_gidString

Return the namespaced worker id.

Returns:

  • (String)

    The worker namespaced id.



172
173
174
# File 'lib/cloudtasker/batch/job.rb', line 172

def batch_gid
  key("#{JOBS_NAMESPACE}/#{batch_id}")
end

#batch_idString

Return the worker id.

Returns:

  • (String)

    The worker id.



163
164
165
# File 'lib/cloudtasker/batch/job.rb', line 163

def batch_id
  worker&.job_id
end

#batch_stateHash

Return the batch state

Returns:

  • (Hash)

    The state of each child worker.



199
200
201
202
203
# File 'lib/cloudtasker/batch/job.rb', line 199

def batch_state
  migrate_batch_state_to_redis_hash

  redis.hgetall(batch_state_gid)
end

#batch_state_gidString

Return the key under which the batch state is stored.

Returns:

  • (String)

    The batch state namespaced id.



181
182
183
# File 'lib/cloudtasker/batch/job.rb', line 181

def batch_state_gid
  key("#{STATES_NAMESPACE}/#{batch_id}")
end

#cleanupObject

Remove all batch and sub-batch keys from Redis.



365
366
367
368
369
370
371
372
373
374
# File 'lib/cloudtasker/batch/job.rb', line 365

def cleanup
  migrate_batch_state_to_redis_hash

  # Delete child batches recursively
  redis.hkeys(batch_state_gid).each { |id| self.class.find(id)&.cleanup }

  # Delete batch redis entries
  redis.del(batch_gid)
  redis.del(batch_state_gid)
end

#complete(status = :completed) ⇒ Object

Post-perform logic. The parent batch is notified if the job is complete.



428
429
430
431
432
433
434
435
436
# File 'lib/cloudtasker/batch/job.rb', line 428

def complete(status = :completed)
  return true if reenqueued? || jobs.any?

  # Notify the parent batch that a child is complete
  on_complete(status) if complete?

  # Notify the parent that a batch node has completed
  parent_batch&.on_batch_node_complete(self, status)
end

#complete?Boolean

Return true if all the child workers have completed.

Returns:

  • (Boolean)

    True if the batch is complete.



281
282
283
284
285
286
# File 'lib/cloudtasker/batch/job.rb', line 281

def complete?
  migrate_batch_state_to_redis_hash

  # Check that all child jobs have completed
  redis.hvals(batch_state_gid).all? { |e| COMPLETION_STATUSES.include?(e) }
end

#executeObject

Execute the batch.



441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
# File 'lib/cloudtasker/batch/job.rb', line 441

def execute
  # Update parent batch state
  parent_batch&.update_state(batch_id, :processing)

  # Perform job
  yield

  # Save batch if child jobs added
  setup if jobs.any?

  # Save parent batch if batch expanded
  parent_batch&.setup if parent_batch&.jobs&.any?

  # Complete batch
  complete(:completed)
rescue DeadWorkerError => e
  complete(:dead)
  raise(e)
rescue StandardError => e
  complete(:errored)
  raise(e)
end

#jobsArray<Cloudtasker::Worker>

The list of jobs in the batch

Returns:



190
191
192
# File 'lib/cloudtasker/batch/job.rb', line 190

def jobs
  @jobs ||= []
end

#key(val) ⇒ String

Return a namespaced key.

Parameters:

  • val (String, Symbol)

    The key to namespace

Returns:

  • (String)

    The namespaced key.



143
144
145
# File 'lib/cloudtasker/batch/job.rb', line 143

def key(val)
  self.class.key(val)
end

#migrate_batch_state_to_redis_hashObject

This method migrates the batch state to be a Redis hash instead of a hash stored in a string key.



238
239
240
241
242
243
244
245
246
247
248
249
250
# File 'lib/cloudtasker/batch/job.rb', line 238

def migrate_batch_state_to_redis_hash
  return unless redis.type(batch_state_gid) == 'string'

  # Migrate batch state to Redis hash if it is still using a legacy string key
  # We acquire a lock then check again
  redis.with_lock(batch_state_gid, max_wait: BATCH_MAX_LOCK_WAIT) do
    if redis.type(batch_state_gid) == 'string'
      state = redis.fetch(batch_state_gid)
      redis.del(batch_state_gid)
      redis.hset(batch_state_gid, state) if state.any?
    end
  end
end

#on_batch_node_complete(child_batch, status = :completed) ⇒ Object

Callback invoked when any batch in the tree gets completed.

Parameters:



352
353
354
355
356
357
358
359
360
# File 'lib/cloudtasker/batch/job.rb', line 352

def on_batch_node_complete(child_batch, status = :completed)
  return false unless status == :completed

  # Notify the worker that a batch node worker has completed
  run_worker_callback(:on_batch_node_complete, child_batch.worker)

  # Notify the parent batch that a node is complete
  parent_batch&.on_batch_node_complete(child_batch)
end

#on_child_complete(child_batch, status = :completed) ⇒ Object

Callback invoked when a direct child batch is complete.

Parameters:



329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
# File 'lib/cloudtasker/batch/job.rb', line 329

def on_child_complete(child_batch, status = :completed)
  # Update batch state
  update_state(child_batch.batch_id, status)

  # Notify the worker that a direct batch child worker has completed
  case status
  when :completed
    run_worker_callback(:on_child_complete, child_batch.worker)
  when :errored
    run_worker_callback(:on_child_error, child_batch.worker)
  when :dead
    run_worker_callback(:on_child_dead, child_batch.worker)
  end

  # Notify the parent batch that we are done with this batch
  on_complete if status != :errored && complete?
end

#on_complete(status = :completed) ⇒ Object

Callback invoked when the batch is complete



313
314
315
316
317
318
319
320
321
322
# File 'lib/cloudtasker/batch/job.rb', line 313

def on_complete(status = :completed)
  # Invoke worker callback
  run_worker_callback(:on_batch_complete) if status == :completed

  # Propagate event
  parent_batch&.on_child_complete(self, status)

  # The batch tree is complete. Cleanup the downstream tree.
  cleanup
end

#parent_batchCloudtasker::Batch::Job?

Return the parent batch, if any.

Returns:



152
153
154
155
156
# File 'lib/cloudtasker/batch/job.rb', line 152

def parent_batch
  return nil unless (parent_id = worker.job_meta.get(key(:parent_id)))

  @parent_batch ||= self.class.find(parent_id)
end

#progress(depth: 0) ⇒ Cloudtasker::Batch::BatchProgress

Calculate the progress of the batch.

Parameters:

  • depth (Integer) (defaults to: 0)

    The depth of calculation. Zero (default) means only immediate children will be taken into account.

Returns:



384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
# File 'lib/cloudtasker/batch/job.rb', line 384

def progress(depth: 0)
  depth = depth.to_i

  # Capture batch state
  state = batch_state

  # Return immediately if we do not need to go down the tree
  return BatchProgress.new(state) if depth <= 0

  # Sum batch progress of current batch and sub-batches up to the specified
  # depth
  state.to_h.reduce(BatchProgress.new(state)) do |memo, (child_id, child_status)|
    memo + (self.class.find(child_id)&.progress(depth: depth - 1) ||
      BatchProgress.new(child_id => child_status))
  end
end

#redisCloudtasker::RedisClient

Return the cloudtasker redis client

Returns:



121
122
123
# File 'lib/cloudtasker/batch/job.rb', line 121

def redis
  self.class.redis
end

#reenqueued?Boolean

Return true if the worker has been re-enqueued. Post-process logic should be skipped for re-enqueued jobs.

Returns:

  • (Boolean)

    Return true if the job was reequeued.



112
113
114
# File 'lib/cloudtasker/batch/job.rb', line 112

def reenqueued?
  worker.job_reenqueued
end

#run_worker_callback(callback, *args) ⇒ any

Run worker callback. The error and dead callbacks get silenced should they raise an error.

Parameters:

  • callback (String, Symbol)

    The callback to run.

  • *args (Array<any>)

    The callback arguments.

Returns:

  • (any)

    The callback return value



297
298
299
300
301
302
303
304
305
306
307
308
# File 'lib/cloudtasker/batch/job.rb', line 297

def run_worker_callback(callback, *args)
  worker.try(callback, *args)
rescue StandardError => e
  # There is no point in retrying jobs due to failure callbacks failing
  # Only completion callbacks will trigger a re-run of the job because
  # these do matter for batch completion
  raise(e) unless IGNORED_ERRORED_CALLBACKS.include?(callback)

  # Log error instead
  worker.logger.error(e)
  worker.logger.error("Callback #{callback} failed to run. Skipping to preserve error flow.")
end

#saveObject

Save serialized version of the worker.

This is required to be able to invoke callback methods in the context of the worker (= instantiated worker) when child workers complete (success or failure).



259
260
261
# File 'lib/cloudtasker/batch/job.rb', line 259

def save
  redis.write(batch_gid, worker.to_h)
end

#setupObject

Save the batch and enqueue all child workers attached to it.



404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
# File 'lib/cloudtasker/batch/job.rb', line 404

def setup
  return true if jobs.empty?

  # Save batch
  save

  # Schedule all child workers
  jobs.each do |j|
    # Schedule the job
    j.schedule

    # Initialize the batch state unless the job has already started (and taken
    # hold of its own status)
    # The batch state is initialized only after the job is scheduled to avoid
    # having never-ending batches - which could occur if a batch was crashing
    # while enqueuing children due to a OOM error and since 'scheduled' is a
    # blocking status.
    redis.hsetnx(batch_state_gid, j.job_id, 'scheduled')
  end
end

#update_state(batch_id, status) ⇒ Object

Update the batch state.

Parameters:

  • job_id (String)

    The batch id.

  • status (String)

    The status of the sub-batch.



269
270
271
272
273
274
# File 'lib/cloudtasker/batch/job.rb', line 269

def update_state(batch_id, status)
  migrate_batch_state_to_redis_hash

  # Update the batch state batch_id entry with the new status
  redis.hset(batch_state_gid, batch_id, status)
end