Class: Turbine::Pipeline::Segment

Inherits:
Object
  • Object
show all
Includes:
Enumerable
Defined in:
lib/turbine/pipeline/segment.rb

Overview

Represents a single stage in a pipeline. A pipeline may contain many segments, each of which transform or filter the elements which pass through it.

Direct Known Subclasses

Expander, Filter, Journal, Pump, Trace, Transform

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initializeSegment

Public: Creates a new Segment. Segment itself is of little value in your pipelines as it will simply emit every value it is given. Instead you should look to Pump, Transform, and Filter.

Returns a Segment.



17
18
19
# File 'lib/turbine/pipeline/segment.rb', line 17

def initialize
  @tracing = false
end

Instance Attribute Details

#sourceObject

The previous segment in the pipeline.



10
11
12
# File 'lib/turbine/pipeline/segment.rb', line 10

def source
  @source
end

Instance Method Details

#append(other) ⇒ Object Also known as: |

Public: Appends other segment to be given the values emitted by this segment. Instead of a Segment instance, a block can be given instead which is used as a Transform.

other - The segment or transform block to be run after this segment.

For example, transforming three numbers using a Transform segment:

Pump.new([10, 20, 30]).append(Transform.new { |x| Math.sqrt(x) })

Or using a lambda:

Pump.new([10, 20, 30]).append(->(x) { x ** 10 })

Or using Dave Thomas’ pipes syntax:

pump       = Pump.new((100..10000).to_a)
divide     = Transform.new { |x| x / 100 }
the_answer = Filter.new { |x| x == 42 }

(pump | divide | the_answer).next # => 42

Returns the other segment.



44
45
46
47
48
49
50
51
# File 'lib/turbine/pipeline/segment.rb', line 44

def append(other)
  if other.respond_to?(:call)
    other = Transform.new(&other)
  end

  other.source = self
  other
end

#eachObject

Public: Iterates through each value in the pipeline.

Returns nothing.



68
69
70
71
# File 'lib/turbine/pipeline/segment.rb', line 68

def each
  rewind
  loop { yield self.next }
end

#inspectObject

Public: A human-readable version of the segment for debugging.

Returns a String.



145
146
147
# File 'lib/turbine/pipeline/segment.rb', line 145

def inspect
  to_s
end

#nextObject

Public: Runs the pipeline once, returning the next value. Repeatedly calling this will yield each value in turn. Once all values have been emitted a StopIteration is raised (mimicking the behaviour of Enumerator).

Returns an object.



61
62
63
# File 'lib/turbine/pipeline/segment.rb', line 61

def next
  handle_value(input)
end

#rewindObject

Public: Rewinds the segment so that iteration can happen from the first input again.

Returns nothing.



77
78
79
80
# File 'lib/turbine/pipeline/segment.rb', line 77

def rewind
  @source.rewind
  @previous = nil
end

#to_sObject

Public: Describes the segments through which each input will pass.

For example:

pipeline.to_s
# => "Pump | Sender(out) | Filter"

Returns a string.



131
132
133
134
135
136
137
138
139
140
# File 'lib/turbine/pipeline/segment.rb', line 131

def to_s
  name = self.class.name

  # Nicked from ActiveSupport since it's faster than gsub, and more
  # memory-efficient than split.
  name = (index = name.rindex('::')) ? name[(index + 2)..-1] : name

  source_string = source_to_s
  source_string.nil? ? name : "#{ source_string } | #{ name }"
end

#traceObject

Public: Returns the trace containing the most recently emitted values for all the source segments, appending this segment’s value to the end of the array.

For example

segment.next && segment.trace
# => [[ #<Node key=:jay>, #<Node key=:claire>, #<Node key=:haley> ]]

segment.next && segment.trace
# => [[ #<Node key=:jay>, #<Node key=:claire>, #<Node key=:alex> ]]

Tracing must be enabled (normally by appending a Trace segment to the pipeline) otherwise a TracingNotEnabledError is raised.

Subclasses may call super with a block; the sole argument given to the block will be trace from the source segments.

Returns an array.



114
115
116
117
118
119
120
121
# File 'lib/turbine/pipeline/segment.rb', line 114

def trace
  unless @tracing
    raise TracingNotEnabledError.new(self)
  end

  trace = @source.respond_to?(:trace) ? @source.trace.dup : []
  block_given? ? yield(trace) : trace.push(@previous)
end

#tracing=(use_tracing) ⇒ Object

Public: Enables tracing on the segment and it’s source. This tells the segment to keep track of the most recently emitted value for use in a subsequent Trace segment.

Returns the tracing setting.



87
88
89
90
91
92
93
# File 'lib/turbine/pipeline/segment.rb', line 87

def tracing=(use_tracing)
  @tracing = use_tracing

  if @source && @source.respond_to?(:tracing=)
    @source.tracing = use_tracing
  end
end