Class: Cloudtasker::Batch::Job
- Inherits:
-
Object
- Object
- Cloudtasker::Batch::Job
- Defined in:
- lib/cloudtasker/batch/job.rb
Overview
Handle batch management
Constant Summary collapse
- JOBS_NAMESPACE =
Key Namespace used for object saved under this class
'jobs'
- STATES_NAMESPACE =
'states'
- COMPLETION_STATUSES =
List of sub-job statuses taken into account when evaluating if the batch is complete.
Batch jobs go through the following states:
-
scheduled: the parent batch has enqueued a worker for the child job
-
processing: the child job is running
-
completed: the child job has completed successfully
-
errored: the child job has encountered an error and must retry
-
dead: the child job has exceeded its max number of retries
The ‘dead’ status is considered to be a completion status as it means that the job will never succeed. There is no point in blocking the batch forever so we proceed forward eventually.
-
%w[completed dead].freeze
- IGNORED_ERRORED_CALLBACKS =
These callbacks do not need to raise errors on their own because the jobs will be either retried or dropped
%i[on_child_error on_child_dead].freeze
- BATCH_MAX_LOCK_WAIT =
The maximum number of seconds to wait for a batch state lock to be acquired.
60
Instance Attribute Summary collapse
-
#worker ⇒ Object
readonly
Returns the value of attribute worker.
Class Method Summary collapse
-
.find(worker_id) ⇒ Cloudtasker::Batch::Job?
Find a batch by id.
-
.for(worker) ⇒ Cloudtasker::Batch::Job
Attach a batch to a worker.
-
.key(val) ⇒ String
Return a namespaced key.
-
.redis ⇒ Cloudtasker::RedisClient
Return the cloudtasker redis client.
Instance Method Summary collapse
-
#==(other) ⇒ Boolean
Equality operator.
-
#add(worker_klass, *args) ⇒ Array<Cloudtasker::Worker>
Add a worker to the batch.
-
#add_to_queue(queue, worker_klass, *args) ⇒ Array<Cloudtasker::Worker>
Add a worker to the batch using a specific queue.
-
#batch_gid ⇒ String
Return the namespaced worker id.
-
#batch_id ⇒ String
Return the worker id.
-
#batch_state ⇒ Hash
Return the batch state.
-
#batch_state_gid ⇒ String
Return the key under which the batch state is stored.
-
#cleanup ⇒ Object
Remove all batch and sub-batch keys from Redis.
-
#complete(status = :completed) ⇒ Object
Post-perform logic.
-
#complete? ⇒ Boolean
Return true if all the child workers have completed.
-
#execute ⇒ Object
Execute the batch.
-
#initialize(worker) ⇒ Job
constructor
Build a new instance of the class.
-
#jobs ⇒ Array<Cloudtasker::Worker>
The list of jobs in the batch.
-
#key(val) ⇒ String
Return a namespaced key.
-
#migrate_batch_state_to_redis_hash ⇒ Object
This method migrates the batch state to be a Redis hash instead of a hash stored in a string key.
-
#on_batch_node_complete(child_batch, status = :completed) ⇒ Object
Callback invoked when any batch in the tree gets completed.
-
#on_child_complete(child_batch, status = :completed) ⇒ Object
Callback invoked when a direct child batch is complete.
-
#on_complete(status = :completed) ⇒ Object
Callback invoked when the batch is complete.
-
#parent_batch ⇒ Cloudtasker::Batch::Job?
Return the parent batch, if any.
-
#progress(depth: 0) ⇒ Cloudtasker::Batch::BatchProgress
Calculate the progress of the batch.
-
#redis ⇒ Cloudtasker::RedisClient
Return the cloudtasker redis client.
-
#reenqueued? ⇒ Boolean
Return true if the worker has been re-enqueued.
-
#run_worker_callback(callback, *args) ⇒ any
Run worker callback.
-
#save ⇒ Object
Save serialized version of the worker.
-
#setup ⇒ Object
Save the batch and enqueue all child workers attached to it.
-
#update_state(batch_id, status) ⇒ Object
Update the batch state.
Constructor Details
#initialize(worker) ⇒ Job
Build a new instance of the class.
102 103 104 |
# File 'lib/cloudtasker/batch/job.rb', line 102 def initialize(worker) @worker = worker end |
Instance Attribute Details
#worker ⇒ Object (readonly)
Returns the value of attribute worker.
7 8 9 |
# File 'lib/cloudtasker/batch/job.rb', line 7 def worker @worker end |
Class Method Details
.find(worker_id) ⇒ Cloudtasker::Batch::Job?
Find a batch by id.
53 54 55 56 57 58 59 60 61 62 63 |
# File 'lib/cloudtasker/batch/job.rb', line 53 def self.find(worker_id) return nil unless worker_id # Retrieve related worker payload = redis.fetch(key("#{JOBS_NAMESPACE}/#{worker_id}")) worker = Cloudtasker::Worker.from_hash(payload) return nil unless worker # Build batch job self.for(worker) end |
.for(worker) ⇒ Cloudtasker::Batch::Job
Attach a batch to a worker
85 86 87 88 89 90 91 92 93 94 95 |
# File 'lib/cloudtasker/batch/job.rb', line 85 def self.for(worker) # Load extension if not loaded already on the worker class worker.class.include(Extension::Worker) unless worker.class <= Extension::Worker # Add batch and parent batch to worker worker.batch = new(worker) worker.parent_batch = worker.batch.parent_batch # Return the batch worker.batch end |
.key(val) ⇒ String
Return a namespaced key.
72 73 74 75 76 |
# File 'lib/cloudtasker/batch/job.rb', line 72 def self.key(val) return nil if val.nil? [to_s.underscore, val.to_s].join('/') end |
.redis ⇒ Cloudtasker::RedisClient
Return the cloudtasker redis client
42 43 44 |
# File 'lib/cloudtasker/batch/job.rb', line 42 def self.redis @redis ||= RedisClient.new end |
Instance Method Details
#==(other) ⇒ Boolean
Equality operator.
132 133 134 |
# File 'lib/cloudtasker/batch/job.rb', line 132 def ==(other) other.is_a?(self.class) && other.batch_id == batch_id end |
#add(worker_klass, *args) ⇒ Array<Cloudtasker::Worker>
Add a worker to the batch
213 214 215 |
# File 'lib/cloudtasker/batch/job.rb', line 213 def add(worker_klass, *args) add_to_queue(worker.job_queue, worker_klass, *args) end |
#add_to_queue(queue, worker_klass, *args) ⇒ Array<Cloudtasker::Worker>
Add a worker to the batch using a specific queue.
226 227 228 229 230 231 232 |
# File 'lib/cloudtasker/batch/job.rb', line 226 def add_to_queue(queue, worker_klass, *args) jobs << worker_klass.new( job_args: args, job_meta: { key(:parent_id) => batch_id }, job_queue: queue ) end |
#batch_gid ⇒ String
Return the namespaced worker id.
172 173 174 |
# File 'lib/cloudtasker/batch/job.rb', line 172 def batch_gid key("#{JOBS_NAMESPACE}/#{batch_id}") end |
#batch_id ⇒ String
Return the worker id.
163 164 165 |
# File 'lib/cloudtasker/batch/job.rb', line 163 def batch_id worker&.job_id end |
#batch_state ⇒ Hash
Return the batch state
199 200 201 202 203 |
# File 'lib/cloudtasker/batch/job.rb', line 199 def batch_state migrate_batch_state_to_redis_hash redis.hgetall(batch_state_gid) end |
#batch_state_gid ⇒ String
Return the key under which the batch state is stored.
181 182 183 |
# File 'lib/cloudtasker/batch/job.rb', line 181 def batch_state_gid key("#{STATES_NAMESPACE}/#{batch_id}") end |
#cleanup ⇒ Object
Remove all batch and sub-batch keys from Redis.
365 366 367 368 369 370 371 372 373 374 |
# File 'lib/cloudtasker/batch/job.rb', line 365 def cleanup migrate_batch_state_to_redis_hash # Delete child batches recursively redis.hkeys(batch_state_gid).each { |id| self.class.find(id)&.cleanup } # Delete batch redis entries redis.del(batch_gid) redis.del(batch_state_gid) end |
#complete(status = :completed) ⇒ Object
Post-perform logic. The parent batch is notified if the job is complete.
428 429 430 431 432 433 434 435 436 |
# File 'lib/cloudtasker/batch/job.rb', line 428 def complete(status = :completed) return true if reenqueued? || jobs.any? # Notify the parent batch that a child is complete on_complete(status) if complete? # Notify the parent that a batch node has completed parent_batch&.on_batch_node_complete(self, status) end |
#complete? ⇒ Boolean
Return true if all the child workers have completed.
281 282 283 284 285 286 |
# File 'lib/cloudtasker/batch/job.rb', line 281 def complete? migrate_batch_state_to_redis_hash # Check that all child jobs have completed redis.hvals(batch_state_gid).all? { |e| COMPLETION_STATUSES.include?(e) } end |
#execute ⇒ Object
Execute the batch.
441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 |
# File 'lib/cloudtasker/batch/job.rb', line 441 def execute # Update parent batch state parent_batch&.update_state(batch_id, :processing) # Perform job yield # Save batch if child jobs added setup if jobs.any? # Save parent batch if batch expanded parent_batch&.setup if parent_batch&.jobs&.any? # Complete batch complete(:completed) rescue DeadWorkerError => e complete(:dead) raise(e) rescue StandardError => e complete(:errored) raise(e) end |
#jobs ⇒ Array<Cloudtasker::Worker>
The list of jobs in the batch
190 191 192 |
# File 'lib/cloudtasker/batch/job.rb', line 190 def jobs @jobs ||= [] end |
#key(val) ⇒ String
Return a namespaced key.
143 144 145 |
# File 'lib/cloudtasker/batch/job.rb', line 143 def key(val) self.class.key(val) end |
#migrate_batch_state_to_redis_hash ⇒ Object
This method migrates the batch state to be a Redis hash instead of a hash stored in a string key.
238 239 240 241 242 243 244 245 246 247 248 249 250 |
# File 'lib/cloudtasker/batch/job.rb', line 238 def migrate_batch_state_to_redis_hash return unless redis.type(batch_state_gid) == 'string' # Migrate batch state to Redis hash if it is still using a legacy string key # We acquire a lock then check again redis.with_lock(batch_state_gid, max_wait: BATCH_MAX_LOCK_WAIT) do if redis.type(batch_state_gid) == 'string' state = redis.fetch(batch_state_gid) redis.del(batch_state_gid) redis.hset(batch_state_gid, state) if state.any? end end end |
#on_batch_node_complete(child_batch, status = :completed) ⇒ Object
Callback invoked when any batch in the tree gets completed.
352 353 354 355 356 357 358 359 360 |
# File 'lib/cloudtasker/batch/job.rb', line 352 def on_batch_node_complete(child_batch, status = :completed) return false unless status == :completed # Notify the worker that a batch node worker has completed run_worker_callback(:on_batch_node_complete, child_batch.worker) # Notify the parent batch that a node is complete parent_batch&.on_batch_node_complete(child_batch) end |
#on_child_complete(child_batch, status = :completed) ⇒ Object
Callback invoked when a direct child batch is complete.
329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 |
# File 'lib/cloudtasker/batch/job.rb', line 329 def on_child_complete(child_batch, status = :completed) # Update batch state update_state(child_batch.batch_id, status) # Notify the worker that a direct batch child worker has completed case status when :completed run_worker_callback(:on_child_complete, child_batch.worker) when :errored run_worker_callback(:on_child_error, child_batch.worker) when :dead run_worker_callback(:on_child_dead, child_batch.worker) end # Notify the parent batch that we are done with this batch on_complete if status != :errored && complete? end |
#on_complete(status = :completed) ⇒ Object
Callback invoked when the batch is complete
313 314 315 316 317 318 319 320 321 322 |
# File 'lib/cloudtasker/batch/job.rb', line 313 def on_complete(status = :completed) # Invoke worker callback run_worker_callback(:on_batch_complete) if status == :completed # Propagate event parent_batch&.on_child_complete(self, status) # The batch tree is complete. Cleanup the downstream tree. cleanup end |
#parent_batch ⇒ Cloudtasker::Batch::Job?
Return the parent batch, if any.
152 153 154 155 156 |
# File 'lib/cloudtasker/batch/job.rb', line 152 def parent_batch return nil unless (parent_id = worker..get(key(:parent_id))) @parent_batch ||= self.class.find(parent_id) end |
#progress(depth: 0) ⇒ Cloudtasker::Batch::BatchProgress
Calculate the progress of the batch.
384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 |
# File 'lib/cloudtasker/batch/job.rb', line 384 def progress(depth: 0) depth = depth.to_i # Capture batch state state = batch_state # Return immediately if we do not need to go down the tree return BatchProgress.new(state) if depth <= 0 # Sum batch progress of current batch and sub-batches up to the specified # depth state.to_h.reduce(BatchProgress.new(state)) do |memo, (child_id, child_status)| memo + (self.class.find(child_id)&.progress(depth: depth - 1) || BatchProgress.new(child_id => child_status)) end end |
#redis ⇒ Cloudtasker::RedisClient
Return the cloudtasker redis client
121 122 123 |
# File 'lib/cloudtasker/batch/job.rb', line 121 def redis self.class.redis end |
#reenqueued? ⇒ Boolean
Return true if the worker has been re-enqueued. Post-process logic should be skipped for re-enqueued jobs.
112 113 114 |
# File 'lib/cloudtasker/batch/job.rb', line 112 def reenqueued? worker.job_reenqueued end |
#run_worker_callback(callback, *args) ⇒ any
Run worker callback. The error and dead callbacks get silenced should they raise an error.
297 298 299 300 301 302 303 304 305 306 307 308 |
# File 'lib/cloudtasker/batch/job.rb', line 297 def run_worker_callback(callback, *args) worker.try(callback, *args) rescue StandardError => e # There is no point in retrying jobs due to failure callbacks failing # Only completion callbacks will trigger a re-run of the job because # these do matter for batch completion raise(e) unless IGNORED_ERRORED_CALLBACKS.include?(callback) # Log error instead worker.logger.error(e) worker.logger.error("Callback #{callback} failed to run. Skipping to preserve error flow.") end |
#save ⇒ Object
Save serialized version of the worker.
This is required to be able to invoke callback methods in the context of the worker (= instantiated worker) when child workers complete (success or failure).
259 260 261 |
# File 'lib/cloudtasker/batch/job.rb', line 259 def save redis.write(batch_gid, worker.to_h) end |
#setup ⇒ Object
Save the batch and enqueue all child workers attached to it.
404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 |
# File 'lib/cloudtasker/batch/job.rb', line 404 def setup return true if jobs.empty? # Save batch save # Schedule all child workers jobs.each do |j| # Schedule the job j.schedule # Initialize the batch state unless the job has already started (and taken # hold of its own status) # The batch state is initialized only after the job is scheduled to avoid # having never-ending batches - which could occur if a batch was crashing # while enqueuing children due to a OOM error and since 'scheduled' is a # blocking status. redis.hsetnx(batch_state_gid, j.job_id, 'scheduled') end end |
#update_state(batch_id, status) ⇒ Object
Update the batch state.
269 270 271 272 273 274 |
# File 'lib/cloudtasker/batch/job.rb', line 269 def update_state(batch_id, status) migrate_batch_state_to_redis_hash # Update the batch state batch_id entry with the new status redis.hset(batch_state_gid, batch_id, status) end |