Class: Ductr::Pipeline
Overview
Pipelines allows to easily declare rich data pipelines.
By using the ‘after` annotation, you can define steps execution hierarchy.
‘sync` and `async` are useful to define job sequences inside step methods.
‘Pipeline` inherits from `Job` which means that pipeline are enqueued as any other job. Pipelines are enqueued in the :ductr_pipelines queue.
class MyPipeline < Ductr::Pipeline
def first_step
sync(MyJob, 1)
async(SomeJob) # Executed when `MyJob` is done
end
after :first_step
def first_parallel_step # Returns when all three `HelloJob` are done
async(HelloJob, :one)
async(HelloJob, :two)
async(HelloJob, :three)
end
after :first_step
def second_parallel_step # Executed concurrently with :first_parallel_step
async(SomeJob)
async(SomeOtherJob)
sync(HelloJob, :one) # Executed when `SomeJob` and `SomeOtherJob` are done
end
after :first_parallel_step, :second_parallel_step
def last_step # Executed when `first_parallel_step` and `second_parallel_step` jobs are done
sync(ByeJob)
end
end
You can define pipelines with only one step by using ‘after` annotation without parameter:
class MonoStepPipeline < Ductr::Pipeline
after
def unique_step
async(MyJob)
async(MyJob)
end
end
A pipeline can inherit from another, allowing you to overload and add steps to the parent pipeline:
class InheritPipeline < MonoStepPipeline
after :unique_step
def not_that_unique
async(MyJob)
end
end
Instance Attribute Summary collapse
-
#runner ⇒ PipelineRunner
readonly
The pipeline’s runner instance.
Attributes inherited from Job
Class Method Summary collapse
-
.after ⇒ void
Annotation to define preceding steps on a pipeline step method.
Instance Method Summary collapse
-
#async(job_class, *params) ⇒ void
Enqueues the given job.
-
#initialize ⇒ Pipeline
constructor
Initializes the pipeline runner.
-
#run ⇒ void
Starts the pipeline runner.
-
#status=(status) ⇒ void
Writes the pipeline’s status into the Ductr’s store.
-
#sync(job_class, *params) ⇒ void
Puts the given job in the queue and waits for it to be done.
Methods inherited from Job
Methods included from JobStatus
Constructor Details
#initialize ⇒ Pipeline
Initializes the pipeline runner
89 90 91 92 93 |
# File 'lib/ductr/pipeline.rb', line 89 def initialize(...) super(...) @runner = PipelineRunner.new(self) end |
Instance Attribute Details
#runner ⇒ PipelineRunner (readonly)
Returns The pipeline’s runner instance.
77 78 79 |
# File 'lib/ductr/pipeline.rb', line 77 def runner @runner end |
Class Method Details
.after ⇒ void
This method returns an undefined value.
Annotation to define preceding steps on a pipeline step method.
72 |
# File 'lib/ductr/pipeline.rb', line 72 annotable :after |
Instance Method Details
#async(job_class, *params) ⇒ void
This method returns an undefined value.
Enqueues the given job.
117 118 119 |
# File 'lib/ductr/pipeline.rb', line 117 def async(job_class, *params) @runner.current_step.enqueue_job job_class.new(*params) end |
#run ⇒ void
This method returns an undefined value.
Starts the pipeline runner.
84 |
# File 'lib/ductr/pipeline.rb', line 84 def_delegators :@runner, :run |
#status=(status) ⇒ void
This method returns an undefined value.
Writes the pipeline’s status into the Ductr’s store.
128 129 130 131 |
# File 'lib/ductr/pipeline.rb', line 128 def status=(status) @status = status Store.write_pipeline(self) end |
#sync(job_class, *params) ⇒ void
This method returns an undefined value.
Puts the given job in the queue and waits for it to be done.
103 104 105 106 107 |
# File 'lib/ductr/pipeline.rb', line 103 def sync(job_class, *params) @runner.current_step.flush_jobs @runner.current_step.enqueue_job job_class.new(*params) @runner.current_step.flush_jobs end |