Class: Cascading::SubAssembly
- Inherits:
-
Object
- Object
- Cascading::SubAssembly
- Defined in:
- lib/cascading/sub_assembly.rb
Overview
Allows you to plugin c.p.SubAssemblies to a cascading.jruby Assembly.
Assumptions:
-
You will either use the tail_pipe of the calling Assembly, or overwrite its incoming_scopes (as do join and union)
-
Your subassembly will have only 1 tail pipe; branching is not supported. This allows you to continue operating upon the tail of the SubAssembly within the calling Assembly
-
You will not use nested c.p.SubAssemblies
This is a low-level tool, so be careful.
Instance Attribute Summary collapse
-
#assembly ⇒ Object
readonly
Returns the value of attribute assembly.
-
#scope ⇒ Object
readonly
Returns the value of attribute scope.
-
#sub_assembly ⇒ Object
readonly
Returns the value of attribute sub_assembly.
-
#tail_pipe ⇒ Object
readonly
Returns the value of attribute tail_pipe.
Instance Method Summary collapse
-
#finalize(pipes, incoming_scopes) ⇒ Object
Complete the addition of the SubAssembly to the Assembly.
-
#initialize(assembly, sub_assembly) ⇒ SubAssembly
constructor
Construct a SubAssembly within the given Assembly.
Constructor Details
#initialize(assembly, sub_assembly) ⇒ SubAssembly
Construct a SubAssembly within the given Assembly
16 17 18 19 20 21 22 23 24 25 |
# File 'lib/cascading/sub_assembly.rb', line 16 def initialize(assembly, sub_assembly) @assembly = assembly @sub_assembly = sub_assembly @tail_pipe = assembly.tail_pipe @scope = assembly.scope # Enforces 1 tail pipe assumption raise 'SubAssembly must call setTails in constructor' unless sub_assembly.tails raise 'SubAssembly must set exactly 1 tail in constructor' unless sub_assembly.tails.size == 1 end |
Instance Attribute Details
#assembly ⇒ Object (readonly)
Returns the value of attribute assembly.
13 14 15 |
# File 'lib/cascading/sub_assembly.rb', line 13 def assembly @assembly end |
#scope ⇒ Object (readonly)
Returns the value of attribute scope.
13 14 15 |
# File 'lib/cascading/sub_assembly.rb', line 13 def scope @scope end |
#sub_assembly ⇒ Object (readonly)
Returns the value of attribute sub_assembly.
13 14 15 |
# File 'lib/cascading/sub_assembly.rb', line 13 def sub_assembly @sub_assembly end |
#tail_pipe ⇒ Object (readonly)
Returns the value of attribute tail_pipe.
13 14 15 |
# File 'lib/cascading/sub_assembly.rb', line 13 def tail_pipe @tail_pipe end |
Instance Method Details
#finalize(pipes, incoming_scopes) ⇒ Object
Complete the addition of the SubAssembly to the Assembly. Propagates Scope through the SubAssembly and updates the tail_pipe of the SubAssembly for passing back to the enclosing Assembly. May accept many incoming pipes, but typically only recieves the tail_pipe of the enclosing Assembly.
32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 |
# File 'lib/cascading/sub_assembly.rb', line 32 def finalize(pipes, incoming_scopes) # Build adjacency list for sub_assembly graph = {} adjacency_list(pipes, sub_assembly.tails.first, graph) # Group adjacency list by next_pipe incoming_edges = graph.inject({}) do |incoming_edges, (prev_pipe, next_pipe)| incoming_edges[next_pipe] ||= [] incoming_edges[next_pipe] << prev_pipe incoming_edges end # Propagate scope through sub_assembly graph inputs = Hash[*pipes.zip(incoming_scopes).flatten] while !incoming_edges.empty? incoming_edges.each do |next_pipe, prev_pipes| if (prev_pipes - inputs.keys).empty? input_scopes = prev_pipes.inject([]) do |input_scopes, prev_pipe| input_scopes << inputs.delete(prev_pipe) input_scopes end inputs[next_pipe] = Scope.outgoing_scope(next_pipe, input_scopes) incoming_edges.delete(next_pipe) end end end raise "Incoming edges did not capture all inputs; #{inputs.size} remaining" unless inputs.size == 1 @tail_pipe, @scope = inputs.first raise "Expected scope propagation to end with tail pipe; ended with '#{@tail_pipe}' instead" unless sub_assembly.tails.first == @tail_pipe # This is the same "fix" applied to our field name metadata after a # sequence of Everies in Aggregations. It just so happens that all of # CountBy, SumBy, and AverageBy end with Everies. However, it appears to # only be necessary for AverageBy (which has different declaredFields for # its partials than its final, unlike the other two). It would be nice # to track this issue down so that we can remove this hack from here and # Aggregations#finalize. discard_each = Java::CascadingPipe::Each.new(tail_pipe, all_fields, Java::CascadingOperation::Identity.new) @scope = Scope.outgoing_scope(discard_each, [scope]) [@tail_pipe, @scope] end |