Class: Ductr::PipelineStep

Inherits:
Object
  • Object
show all
Extended by:
Forwardable
Defined in:
lib/ductr/pipeline_step.rb

Overview

Representation of a pipeline’s step. Hold a fiber to execute steps concurrently.

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(pipeline, name) ⇒ PipelineStep

Creates a step for the given pipeline.

Parameters:

  • pipeline (Pipeline)

    The pipeline containing step’s method

  • The (Symbol)

    name of the pipeline’s step method



37
38
39
40
41
42
43
# File 'lib/ductr/pipeline_step.rb', line 37

def initialize(pipeline, name)
  @pipeline = pipeline
  @name = name

  @jobs = []
  @left = []
end

Instance Attribute Details

#jobsArray<Job> (readonly)

Returns The step’s queued jobs.

Returns:

  • (Array<Job>)

    The step’s queued jobs



26
27
28
# File 'lib/ductr/pipeline_step.rb', line 26

def jobs
  @jobs
end

#leftPipelineStep

Returns The previous step.

Returns:



29
30
31
# File 'lib/ductr/pipeline_step.rb', line 29

def left
  @left
end

#nameSymbol (readonly)

Returns The step’s name.

Returns:

  • (Symbol)

    The step’s name



24
25
26
# File 'lib/ductr/pipeline_step.rb', line 24

def name
  @name
end

#pipelinePipeline (readonly)

Returns The step’s pipeline.

Returns:



22
23
24
# File 'lib/ductr/pipeline_step.rb', line 22

def pipeline
  @pipeline
end

Instance Method Details

#alive?Boolean

Check if the step’s fiber is running.

Returns:

  • (Boolean)

    True if step’s fiber is running



19
# File 'lib/ductr/pipeline_step.rb', line 19

def_delegators :fiber, :resume, :alive?

#done?Boolean

Check if the step is done.

Returns:

  • (Boolean)

    True if the step is done



63
64
65
# File 'lib/ductr/pipeline_step.rb', line 63

def done?
  !fiber.alive?
end

#enqueue_job(job) ⇒ void

This method returns an undefined value.

Track, registers and enqueues the given job.

Parameters:

  • job (Job)

    The job to enqueue



52
53
54
55
56
# File 'lib/ductr/pipeline_step.rb', line 52

def enqueue_job(job)
  jobs.push(job)
  Store.register_job(job)
  job.enqueue
end

#fiberFiber

The step’s fiber instance, invokes the pipeline’s step method.

Returns:

  • (Fiber)

    The step’s fiber



83
84
85
86
87
88
89
90
# File 'lib/ductr/pipeline_step.rb', line 83

def fiber
  @fiber ||= Fiber.new do
    Fiber.yield until left.all?(&:done?)

    pipeline.send(name)
    flush_jobs
  end
end

#flush_jobsvoid

This method returns an undefined value.

Waits until all step’s jobs are stopped.



72
73
74
75
76
# File 'lib/ductr/pipeline_step.rb', line 72

def flush_jobs
  return if jobs.empty?

  Fiber.yield until Store.read_jobs(*jobs).all?(&:stopped?)
end

#resumevoid

This method returns an undefined value.

Resumes the step’s fiber.



19
# File 'lib/ductr/pipeline_step.rb', line 19

def_delegators :fiber, :resume, :alive?