Class: Tap::Joins::Gate
- Inherits:
-
Join
- Object
- Join
- Tap::Joins::Gate
- Defined in:
- lib/tap/joins/gate.rb
Overview
:startdoc::join collects results before the join
Similar to a synchronized merge, but collects all results regardless of where they come from. Gates enque themselves when called as a join, and won’t let results pass until they get run as a node.
% tap run -- load a -- load b -- inspect --[0,1][2].gate
["a", "b"]
Gates are useful in conjunction with iteration where a single task may feed multiple results to a single join; in this case a sync merge doesn’t produce the desired behavior of collecting the results.
% tap run -- load/yaml "[1, 2, 3]" --:i inspect --:.gate inspect
1
2
3
[1, 2, 3]
% tap run -- load/yaml "[1, 2, 3]" --:i inspect --:.sync inspect
1
[1]
2
[2]
3
[3]
When a limit is specified, the gate will collect results up to the limit and then pass the results. Any leftover results are still passed at the end.
% tap run -- load/yaml "[1, 2, 3]" --:i inspect -- inspect --. gate 1 2 --limit 2
1
2
[1, 2]
3
[3]
Instance Attribute Summary collapse
-
#results ⇒ Object
readonly
An array of results collected thusfar.
Instance Method Summary collapse
- #call(result) ⇒ Object
-
#initialize(config = {}, app = Tap::App.instance) ⇒ Gate
constructor
A new instance of Gate.
Constructor Details
#initialize(config = {}, app = Tap::App.instance) ⇒ Gate
Returns a new instance of Gate.
51 52 53 54 |
# File 'lib/tap/joins/gate.rb', line 51 def initialize(config={}, app=Tap::App.instance) super @results = nil end |
Instance Attribute Details
#results ⇒ Object (readonly)
An array of results collected thusfar.
47 48 49 |
# File 'lib/tap/joins/gate.rb', line 47 def results @results end |
Instance Method Details
#call(result) ⇒ Object
56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 |
# File 'lib/tap/joins/gate.rb', line 56 def call(result) if @results # Results are set, so self is already enqued and collecting # results. If the input is the collection, then it's time # to dispatch the results and reset. Otherwise, just # collect the input and wait. if result == @results @results = nil super(result) else @results << result if limit && @results.length >= limit super(@results.dup) @results.clear end end else # No results are set, so this is a first call and self is # not enqued. Setup the collection. @results = [result] app.enq(self, @results) end self end |