Class: Simplekiq::OrchestrationExecutor
- Inherits:
-
Object
- Object
- Simplekiq::OrchestrationExecutor
- Defined in:
- lib/simplekiq/orchestration_executor.rb
Class Method Summary collapse
Instance Method Summary collapse
Class Method Details
.execute(args:, job:, workflow:) ⇒ Object
5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 |
# File 'lib/simplekiq/orchestration_executor.rb', line 5 def self.execute(args:, job:, workflow:) if workflow.empty? Simplekiq.run_empty_callbacks(job, args: args) return end orchestration_batch = Sidekiq::Batch.new orchestration_batch.description = "#{job.class.name} Simplekiq orchestration" Simplekiq.auto_define_callbacks(orchestration_batch, args: args, job: job) orchestration_batch.jobs do Simplekiq::BatchTrackerJob.perform_async(job.class.name, orchestration_batch.bid, args) new.run_step(workflow, 0) end end |
Instance Method Details
#on_success(status, options) ⇒ Object
43 44 45 46 47 48 49 |
# File 'lib/simplekiq/orchestration_executor.rb', line 43 def on_success(status, ) return if ["step"] == ["orchestration_workflow"].length Sidekiq::Batch.new(status.parent_bid).jobs do run_step(["orchestration_workflow"], ["step"]) end end |
#run_step(workflow, step) ⇒ Object
22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 |
# File 'lib/simplekiq/orchestration_executor.rb', line 22 def run_step(workflow, step) *jobs = workflow.at(step) # This will never be empty because Orchestration#serialized_workflow skips inserting # a new step for in_parallel if there were no inner jobs specified. next_step = step + 1 step_batch = Sidekiq::Batch.new step_batch.description = "Simplekiq orchestrated step #{next_step}" step_batch.on( "success", self.class, {"orchestration_workflow" => workflow, "step" => next_step} ) step_batch.jobs do jobs.each do |job| Object.const_get(job["klass"]).perform_async(*job["args"]) end end end |