Class: Ductr::Pipeline

Inherits:
Job
  • Object
show all
Defined in:
lib/ductr/pipeline.rb

Overview

Pipelines allows to easily declare rich data pipelines.

By using the ‘after` annotation, you can define steps execution hierarchy.

‘sync` and `async` are useful to define job sequences inside step methods.

‘Pipeline` inherits from `Job` which means that pipeline are enqueued as any other job. Pipelines are enqueued in the :ductr_pipelines queue.

class MyPipeline < Ductr::Pipeline
  def first_step
    sync(MyJob, 1)
    async(SomeJob) # Executed when `MyJob` is done
  end

  after :first_step
  def first_parallel_step # Returns when all three `HelloJob` are done
    async(HelloJob, :one)
    async(HelloJob, :two)
    async(HelloJob, :three)
  end

  after :first_step
  def second_parallel_step # Executed concurrently with :first_parallel_step
    async(SomeJob)
    async(SomeOtherJob)
    sync(HelloJob, :one) # Executed when `SomeJob` and `SomeOtherJob` are done
  end

  after :first_parallel_step, :second_parallel_step
  def last_step # Executed when `first_parallel_step` and `second_parallel_step` jobs are done
    sync(ByeJob)
  end
end

You can define pipelines with only one step by using ‘after` annotation without parameter:

class MonoStepPipeline < Ductr::Pipeline
  after
  def unique_step
    async(MyJob)
    async(MyJob)
  end
end

A pipeline can inherit from another, allowing you to overload and add steps to the parent pipeline:

class InheritPipeline < MonoStepPipeline
  after :unique_step
  def not_that_unique
    async(MyJob)
  end
end

Instance Attribute Summary collapse

Attributes inherited from Job

#error, #status

Class Method Summary collapse

Instance Method Summary collapse

Methods inherited from Job

#adapter, #logger, #perform

Methods included from JobStatus

included, #stopped?

Constructor Details

#initializePipeline

Initializes the pipeline runner



89
90
91
92
93
# File 'lib/ductr/pipeline.rb', line 89

def initialize(...)
  super(...)

  @runner = PipelineRunner.new(self)
end

Instance Attribute Details

#runnerPipelineRunner (readonly)

Returns The pipeline’s runner instance.

Returns:



77
78
79
# File 'lib/ductr/pipeline.rb', line 77

def runner
  @runner
end

Class Method Details

.aftervoid

This method returns an undefined value.

Annotation to define preceding steps on a pipeline step method.

Examples:

after :some_step_method, :some_other_step_method
def my_step
  # ...
end


72
# File 'lib/ductr/pipeline.rb', line 72

annotable :after

Instance Method Details

#async(job_class, *params) ⇒ void

This method returns an undefined value.

Enqueues the given job.

Parameters:

  • job_class (Class<Job>)

    The job to enqueue

  • *params (Array<Object>)

    The job’s params



117
118
119
# File 'lib/ductr/pipeline.rb', line 117

def async(job_class, *params)
  @runner.current_step.enqueue_job job_class.new(*params)
end

#runvoid

This method returns an undefined value.

Starts the pipeline runner.



84
# File 'lib/ductr/pipeline.rb', line 84

def_delegators :@runner, :run

#status=(status) ⇒ void

This method returns an undefined value.

Writes the pipeline’s status into the Ductr’s store.

Parameters:

  • status (Symbol)

    The status of the job



128
129
130
131
# File 'lib/ductr/pipeline.rb', line 128

def status=(status)
  @status = status
  Store.write_pipeline(self)
end

#sync(job_class, *params) ⇒ void

This method returns an undefined value.

Puts the given job in the queue and waits for it to be done.

Parameters:

  • job_class (Class<Job>)

    The job to enqueue

  • *params (Array<Object>)

    The job’s params



103
104
105
106
107
# File 'lib/ductr/pipeline.rb', line 103

def sync(job_class, *params)
  @runner.current_step.flush_jobs
  @runner.current_step.enqueue_job job_class.new(*params)
  @runner.current_step.flush_jobs
end