Class: Tap::Joins::Sync
- Defined in:
- lib/tap/joins/sync.rb
Overview
:startdoc::join synchronized multi-way join
Sync works the same as Join, but passes the collected results of the inputs (ie an array) to the outputs. The results will not be passed until all of inputs have returned. A collision results if a single input completes twice before the group completes as a whole.
Defined Under Namespace
Classes: Callback, SynchronizeError
Constant Summary collapse
- NIL_VALUE =
NIL_VALUE is used to mark empty slots (nil itself cannot be used because it is a valid result value).
Object.new
Instance Attribute Summary collapse
-
#results ⇒ Object
readonly
An array holding results until the batch is ready to execute.
Attributes inherited from Tap::Join
Attributes inherited from App::Api
Instance Method Summary collapse
-
#call(result, index) ⇒ Object
Call is called by a Callback and stores the result at the specified index in results.
-
#initialize(config = {}, app = Tap::App.current) ⇒ Sync
constructor
A new instance of Sync.
-
#join(inputs, outputs) ⇒ Object
A synchronized join sets a Callback as the join of each input.
-
#reset ⇒ Object
Resets results.
Methods inherited from Tap::Join
#associations, build, #to_spec
Methods inherited from App::Api
#associations, build, help, inherited, #inspect, parse, parse!, parser, #to_spec
Methods included from Signals
#sig, #signal, #signal?, #signals
Methods included from Signals::ModuleMethods
Constructor Details
#initialize(config = {}, app = Tap::App.current) ⇒ Sync
Returns a new instance of Sync.
20 21 22 23 |
# File 'lib/tap/joins/sync.rb', line 20 def initialize(config={}, app=Tap::App.current) super @results = nil end |
Instance Attribute Details
#results ⇒ Object (readonly)
An array holding results until the batch is ready to execute.
18 19 20 |
# File 'lib/tap/joins/sync.rb', line 18 def results @results end |
Instance Method Details
#call(result, index) ⇒ Object
Call is called by a Callback and stores the result at the specified index in results. If the results have all been set, then they are sent to each output.
57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 |
# File 'lib/tap/joins/sync.rb', line 57 def call(result, index) if result == NIL_VALUE raise "NIL_VALUE cannot be passed as a result" end unless results[index] == NIL_VALUE raise SynchronizeError, "already got a result for: #{inputs[index]}" end results[index] = result unless results.include?(NIL_VALUE) outputs.each {|output| exe(output, results) } reset end end |
#join(inputs, outputs) ⇒ Object
A synchronized join sets a Callback as the join of each input. The callback is responsible for setting the result of each input into the correct ‘results’ slot.
34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 |
# File 'lib/tap/joins/sync.rb', line 34 def join(inputs, outputs) @inputs.each do |input| input.joins.delete_if do |join| join.kind_of?(Callback) && join.join == self end end if @inputs @inputs = inputs index = 0 inputs.each do |input| input.joins << Callback.new(self, index) index += 1 end if inputs reset @outputs = outputs self end |
#reset ⇒ Object
Resets results. Normally there is no reason to call this method as it will shuffle the arguments being passed through self.
27 28 29 |
# File 'lib/tap/joins/sync.rb', line 27 def reset @results = inputs ? Array.new(inputs.length, NIL_VALUE) : nil end |