Class: Tap::Joins::Gate

Inherits:
Join
  • Object
show all
Defined in:
lib/tap/joins/gate.rb

Overview

:startdoc::join collects results before the join

Similar to a synchronized merge, but collects all results regardless of where they come from. Gates enque themselves when called as a join, and won’t let results pass until they get run as a node.

% tap run -- load a -- load b -- inspect --[0,1][2].gate
["a", "b"]

Gates are useful in conjunction with iteration where a single task may feed multiple results to a single join; in this case a sync merge doesn’t produce the desired behavior of collecting the results.

% tap run -- load/yaml "[1, 2, 3]" --:i inspect --:.gate inspect
1
2
3
[1, 2, 3]

% tap run -- load/yaml "[1, 2, 3]" --:i inspect --:.sync inspect
1
[1]
2
[2]
3
[3]

When a limit is specified, the gate will collect results up to the limit and then pass the results. Any leftover results are still passed at the end.

% tap run -- load/yaml "[1, 2, 3]" --:i inspect -- inspect --. gate 1 2 --limit 2
1
2
[1, 2]
3
[3]

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(config = {}, app = Tap::App.instance) ⇒ Gate

Returns a new instance of Gate.



51
52
53
54
# File 'lib/tap/joins/gate.rb', line 51

def initialize(config={}, app=Tap::App.instance)
  super
  @results = nil
end

Instance Attribute Details

#resultsObject (readonly)

An array of results collected thusfar.



47
48
49
# File 'lib/tap/joins/gate.rb', line 47

def results
  @results
end

Instance Method Details

#call(result) ⇒ Object



56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
# File 'lib/tap/joins/gate.rb', line 56

def call(result)
  if @results
    # Results are set, so self is already enqued and collecting
    # results.  If the input is the collection, then it's time
    # to dispatch the results and reset.  Otherwise, just
    # collect the input and wait.
    
    if result == @results
      @results = nil
      super(result)
    else
      @results << result
      
      if limit && @results.length >= limit
        super(@results.dup)
        @results.clear
      end
    end
    
  else
    # No results are set, so this is a first call and self is
    # not enqued.  Setup the collection.
    
    @results = [result]
    app.enq(self, @results)
  end
  
  self
end