Class: Ductr::PipelineRunner

Inherits:
Object
  • Object
show all
Defined in:
lib/ductr/pipeline_runner.rb

Overview

In charge to parse pipeline annotations, initializing and running pipeline steps.

Constant Summary collapse

TICK =

Returns Time to wait in second before resuming all alive steps.

Returns:

  • (Float)

    Time to wait in second before resuming all alive steps

0.1

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(pipeline) ⇒ PipelineRunner

Parses and initializes the given pipeline’s steps.

Parameters:

  • pipeline (Pipeline)

    The pipeline to parse and run.



21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
# File 'lib/ductr/pipeline_runner.rb', line 21

def initialize(pipeline)
  annotated_methods = pipeline.class.annotated_methods

  @steps = step_names(annotated_methods).map do |name|
    PipelineStep.new(pipeline, name)
  end

  annotated_methods.each do |method|
    step_by(name: method.name).left = method.find_annotation(:after).params.map do |left_step_name|
      step_by(name: left_step_name)
    end
  end

  @remaining_steps = @steps.dup
end

Instance Attribute Details

#remaining_stepsArray<PipelineStep> (readonly)

Returns The remaining steps to run.

Returns:



14
15
16
# File 'lib/ductr/pipeline_runner.rb', line 14

def remaining_steps
  @remaining_steps
end

#stepsArray<PipelineStep> (readonly)

Returns All the steps declared in the pipeline.

Returns:

  • (Array<PipelineStep>)

    All the steps declared in the pipeline



12
13
14
# File 'lib/ductr/pipeline_runner.rb', line 12

def steps
  @steps
end

Instance Method Details

#current_stepPipelineStep

Returns the current step based on fiber execution context.

Returns:



60
61
62
# File 'lib/ductr/pipeline_runner.rb', line 60

def current_step
  step_by fiber: Fiber.current
end

#runvoid

This method returns an undefined value.

Actually runs the pipeline. Resumes step’s fiber until they are all finished.



43
44
45
46
47
48
49
50
51
52
53
# File 'lib/ductr/pipeline_runner.rb', line 43

def run
  until @remaining_steps.empty?
    @remaining_steps.each do |step|
      next @remaining_steps.delete(step) unless step.alive?

      step.resume
    end

    sleep(TICK)
  end
end

#step_by(**name_and_val) ⇒ PipelineStep, Nil

Finds a step corresponding to the given name and value.

Examples:

Finds a step named ‘my-step`

step_by(name: :my_step)

Parameters:

  • **name_and_val (Hash<Symbol: Object>)

    Step attribute’s name and value

Returns:



87
88
89
90
91
92
93
# File 'lib/ductr/pipeline_runner.rb', line 87

def step_by(**name_and_val)
  name, value = *name_and_val.to_a.first

  steps.find do |step|
    step.send(name) == value
  end
end

#step_names(annotated_methods) ⇒ Array<Symbol>

Parses given annotated methods and extract all step names.

Parameters:

  • annotated_methods (Array<Annotable::AnnotatedMethod>)

    The annotated method to parse

Returns:

  • (Array<Symbol>)

    The declared step’s names



71
72
73
74
75
# File 'lib/ductr/pipeline_runner.rb', line 71

def step_names(annotated_methods)
  annotated_methods.flat_map do |method|
    [method.name, *method.find_annotation(:after).params]
  end.uniq
end