Class: Ductr::PipelineRunner
- Inherits:
-
Object
- Object
- Ductr::PipelineRunner
- Defined in:
- lib/ductr/pipeline_runner.rb
Overview
In charge to parse pipeline annotations, initializing and running pipeline steps.
Constant Summary collapse
- TICK =
Returns Time to wait in second before resuming all alive steps.
0.1
Instance Attribute Summary collapse
-
#remaining_steps ⇒ Array<PipelineStep>
readonly
The remaining steps to run.
-
#steps ⇒ Array<PipelineStep>
readonly
All the steps declared in the pipeline.
Instance Method Summary collapse
-
#current_step ⇒ PipelineStep
Returns the current step based on fiber execution context.
-
#initialize(pipeline) ⇒ PipelineRunner
constructor
Parses and initializes the given pipeline’s steps.
-
#run ⇒ void
Actually runs the pipeline.
-
#step_by(**name_and_val) ⇒ PipelineStep, Nil
Finds a step corresponding to the given name and value.
-
#step_names(annotated_methods) ⇒ Array<Symbol>
Parses given annotated methods and extract all step names.
Constructor Details
#initialize(pipeline) ⇒ PipelineRunner
Parses and initializes the given pipeline’s steps.
21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 |
# File 'lib/ductr/pipeline_runner.rb', line 21 def initialize(pipeline) annotated_methods = pipeline.class.annotated_methods @steps = step_names(annotated_methods).map do |name| PipelineStep.new(pipeline, name) end annotated_methods.each do |method| step_by(name: method.name).left = method.find_annotation(:after).params.map do |left_step_name| step_by(name: left_step_name) end end @remaining_steps = @steps.dup end |
Instance Attribute Details
#remaining_steps ⇒ Array<PipelineStep> (readonly)
Returns The remaining steps to run.
14 15 16 |
# File 'lib/ductr/pipeline_runner.rb', line 14 def remaining_steps @remaining_steps end |
#steps ⇒ Array<PipelineStep> (readonly)
Returns All the steps declared in the pipeline.
12 13 14 |
# File 'lib/ductr/pipeline_runner.rb', line 12 def steps @steps end |
Instance Method Details
#current_step ⇒ PipelineStep
Returns the current step based on fiber execution context.
60 61 62 |
# File 'lib/ductr/pipeline_runner.rb', line 60 def current_step step_by fiber: Fiber.current end |
#run ⇒ void
This method returns an undefined value.
Actually runs the pipeline. Resumes step’s fiber until they are all finished.
43 44 45 46 47 48 49 50 51 52 53 |
# File 'lib/ductr/pipeline_runner.rb', line 43 def run until @remaining_steps.empty? @remaining_steps.each do |step| next @remaining_steps.delete(step) unless step.alive? step.resume end sleep(TICK) end end |
#step_by(**name_and_val) ⇒ PipelineStep, Nil
Finds a step corresponding to the given name and value.
87 88 89 90 91 92 93 |
# File 'lib/ductr/pipeline_runner.rb', line 87 def step_by(**name_and_val) name, value = *name_and_val.to_a.first steps.find do |step| step.send(name) == value end end |
#step_names(annotated_methods) ⇒ Array<Symbol>
Parses given annotated methods and extract all step names.
71 72 73 74 75 |
# File 'lib/ductr/pipeline_runner.rb', line 71 def step_names(annotated_methods) annotated_methods.flat_map do |method| [method.name, *method.find_annotation(:after).params] end.uniq end |