Class: Mementus::Pipeline::Step

Inherits:
Object
  • Object
show all
Defined in:
lib/mementus/pipeline/step.rb

Overview

Represents a step in a pipeline chain.

New steps are constructed from a ‘source` enumerable (usually the previous step in the chain) and an optional `pipe` which provides the strategy for generating output values.

Each step has an internal ‘Fiber` context which is used to yield control to the next step in the chain on each value in the iteration, rather than cycle through the entire list of values before forwarding control.

This avoids the problem of iterating over a huge set of nodes and edges which are then discarded by a later step.

The approach here is roughly similar to the way that Ruby chains together ‘Enumerator::Lazy` objects.

Instance Method Summary collapse

Constructor Details

#initialize(source, pipe = nil, graph = nil) ⇒ Step

Initialize a pipeline step from the given source.

Parameters:



23
24
25
26
27
# File 'lib/mementus/pipeline/step.rb', line 23

def initialize(source, pipe=nil, graph=nil)
  @source = source
  @pipe = pipe || Pipe.new(graph)
  @graph = graph
end

Instance Method Details

#allObject

Returns all values in the sequence



102
103
104
# File 'lib/mementus/pipeline/step.rb', line 102

def all
  to_enum.to_a
end

#breadth_first(method = :out) ⇒ Object



115
116
117
# File 'lib/mementus/pipeline/step.rb', line 115

def breadth_first(method=:out)
  Step.new(BreadthFirstSearch.new(graph, source.first.id, method))
end

#depth_first(method = :out) ⇒ Object



111
112
113
# File 'lib/mementus/pipeline/step.rb', line 111

def depth_first(method=:out)
  Step.new(DepthFirstSearch.new(graph, source.first.id, method))
end

#eachObject

Loop through each value in the sequence, yielding control to the next step if necessary.

If a block is provided, it is called with the value. Otherwise, a lazy enumerator representing the wrapped source is returned.



37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
# File 'lib/mementus/pipeline/step.rb', line 37

def each
  return to_enum unless block_given?

  context = Fiber.new do
    source.each do |element|
      pipe.call(element)
    end

    raise StopIteration
  end

  loop do
    yield context.resume
  end
end

#firstObject

Returns the first value in the sequence.



97
98
99
# File 'lib/mementus/pipeline/step.rb', line 97

def first
  to_enum.first
end

#idObject

Dereference ids from the source elements.



54
55
56
57
58
# File 'lib/mementus/pipeline/step.rb', line 54

def id
  ids = to_enum.map { |element| element.id }
  return ids.first if ids.length == 1
  ids
end

#in(match = nil) ⇒ Object

Traverse to the incoming nodes pointing to the source elements.



70
71
72
73
74
75
76
# File 'lib/mementus/pipeline/step.rb', line 70

def in(match=nil)
  incoming_nodes = source.inject([]) do |result, node|
    result.concat(node.incoming(match))
  end

  Step.new(incoming_nodes, Pipe.new(graph), graph)
end

#in_e(match = nil) ⇒ Object

Traverse to the incoming edges pointing to the source elements.



88
89
90
91
92
93
94
# File 'lib/mementus/pipeline/step.rb', line 88

def in_e(match=nil)
  incoming_edges = source.inject([]) do |result, node|
    result.concat(graph.incoming_edges(node.id, match))
  end

  Step.new(incoming_edges, Pipe.new(graph), graph)
end

#out(match = nil) ⇒ Object

Traverse to the outgoing nodes adjacent to the source elements.



61
62
63
64
65
66
67
# File 'lib/mementus/pipeline/step.rb', line 61

def out(match=nil)
  outgoing_nodes = source.inject([]) do |result, node|
    result.concat(node.outgoing(match))
  end

  Step.new(outgoing_nodes, Pipe.new(graph), graph)
end

#out_e(match = nil) ⇒ Object

Traverse to the outgoing edges from the source elements.



79
80
81
82
83
84
85
# File 'lib/mementus/pipeline/step.rb', line 79

def out_e(match=nil)
  outgoing_edges = source.inject([]) do |result, node|
    result.concat(graph.outgoing_edges(node.id, match))
  end

  Step.new(outgoing_edges, Pipe.new(graph), graph)
end

#take(num) ⇒ Object

Returns the given number of values from the sequence.



107
108
109
# File 'lib/mementus/pipeline/step.rb', line 107

def take(num)
  to_enum.take(num)
end