Class: Ductr::PipelineStep
- Inherits:
-
Object
- Object
- Ductr::PipelineStep
- Extended by:
- Forwardable
- Defined in:
- lib/ductr/pipeline_step.rb
Overview
Representation of a pipeline’s step. Hold a fiber to execute steps concurrently.
Instance Attribute Summary collapse
-
#jobs ⇒ Array<Job>
readonly
The step’s queued jobs.
-
#left ⇒ PipelineStep
The previous step.
-
#name ⇒ Symbol
readonly
The step’s name.
-
#pipeline ⇒ Pipeline
readonly
The step’s pipeline.
Instance Method Summary collapse
-
#alive? ⇒ Boolean
Check if the step’s fiber is running.
-
#done? ⇒ Boolean
Check if the step is done.
-
#enqueue_job(job) ⇒ void
Track, registers and enqueues the given job.
-
#fiber ⇒ Fiber
The step’s fiber instance, invokes the pipeline’s step method.
-
#flush_jobs ⇒ void
Waits until all step’s jobs are stopped.
-
#initialize(pipeline, name) ⇒ PipelineStep
constructor
Creates a step for the given pipeline.
-
#resume ⇒ void
Resumes the step’s fiber.
Constructor Details
#initialize(pipeline, name) ⇒ PipelineStep
Creates a step for the given pipeline.
37 38 39 40 41 42 43 |
# File 'lib/ductr/pipeline_step.rb', line 37 def initialize(pipeline, name) @pipeline = pipeline @name = name @jobs = [] @left = [] end |
Instance Attribute Details
#jobs ⇒ Array<Job> (readonly)
Returns The step’s queued jobs.
26 27 28 |
# File 'lib/ductr/pipeline_step.rb', line 26 def jobs @jobs end |
#left ⇒ PipelineStep
Returns The previous step.
29 30 31 |
# File 'lib/ductr/pipeline_step.rb', line 29 def left @left end |
#name ⇒ Symbol (readonly)
Returns The step’s name.
24 25 26 |
# File 'lib/ductr/pipeline_step.rb', line 24 def name @name end |
#pipeline ⇒ Pipeline (readonly)
Returns The step’s pipeline.
22 23 24 |
# File 'lib/ductr/pipeline_step.rb', line 22 def pipeline @pipeline end |
Instance Method Details
#alive? ⇒ Boolean
Check if the step’s fiber is running.
19 |
# File 'lib/ductr/pipeline_step.rb', line 19 def_delegators :fiber, :resume, :alive? |
#done? ⇒ Boolean
Check if the step is done.
63 64 65 |
# File 'lib/ductr/pipeline_step.rb', line 63 def done? !fiber.alive? end |
#enqueue_job(job) ⇒ void
This method returns an undefined value.
Track, registers and enqueues the given job.
52 53 54 55 56 |
# File 'lib/ductr/pipeline_step.rb', line 52 def enqueue_job(job) jobs.push(job) Store.register_job(job) job.enqueue end |
#fiber ⇒ Fiber
The step’s fiber instance, invokes the pipeline’s step method.
83 84 85 86 87 88 89 90 |
# File 'lib/ductr/pipeline_step.rb', line 83 def fiber @fiber ||= Fiber.new do Fiber.yield until left.all?(&:done?) pipeline.send(name) flush_jobs end end |
#flush_jobs ⇒ void
This method returns an undefined value.
Waits until all step’s jobs are stopped.
72 73 74 75 76 |
# File 'lib/ductr/pipeline_step.rb', line 72 def flush_jobs return if jobs.empty? Fiber.yield until Store.read_jobs(*jobs).all?(&:stopped?) end |
#resume ⇒ void
This method returns an undefined value.
Resumes the step’s fiber.
19 |
# File 'lib/ductr/pipeline_step.rb', line 19 def_delegators :fiber, :resume, :alive? |