Class: Cascading::SubAssembly
- Inherits:
-
Object
- Object
- Cascading::SubAssembly
- Defined in:
- lib/cascading/sub_assembly.rb
Overview
Allows you to plugin c.p.SubAssemblies to a cascading.jruby Assembly.
Assumptions:
-
You will either use the tail_pipe of the calling Assembly, or overwrite
its incoming_scopes (as do join and union)
-
Your subassembly will have only 1 tail pipe; branching is not
supported. This allows you to continue operating upon the tail of the SubAssembly within the calling Assembly
-
You will not use nested c.p.SubAssemblies
This is a low-level tool, so be careful.
Instance Attribute Summary collapse
-
#assembly ⇒ Object
readonly
Returns the value of attribute assembly.
-
#scope ⇒ Object
readonly
Returns the value of attribute scope.
-
#sub_assembly ⇒ Object
readonly
Returns the value of attribute sub_assembly.
-
#tail_pipe ⇒ Object
readonly
Returns the value of attribute tail_pipe.
Instance Method Summary collapse
- #finalize(pipes, incoming_scopes) ⇒ Object
-
#initialize(assembly, sub_assembly) ⇒ SubAssembly
constructor
A new instance of SubAssembly.
Constructor Details
#initialize(assembly, sub_assembly) ⇒ SubAssembly
Returns a new instance of SubAssembly.
18 19 20 21 22 23 24 25 26 27 |
# File 'lib/cascading/sub_assembly.rb', line 18 def initialize(assembly, sub_assembly) @assembly = assembly @sub_assembly = sub_assembly @tail_pipe = assembly.tail_pipe @scope = assembly.scope # Enforces 1 tail pipe assumption raise 'SubAssembly must call setTails in constructor' unless sub_assembly.tails raise 'SubAssembly must set exactly 1 tail in constructor' unless sub_assembly.tails.size == 1 end |
Instance Attribute Details
#assembly ⇒ Object (readonly)
Returns the value of attribute assembly.
16 17 18 |
# File 'lib/cascading/sub_assembly.rb', line 16 def assembly @assembly end |
#scope ⇒ Object (readonly)
Returns the value of attribute scope.
16 17 18 |
# File 'lib/cascading/sub_assembly.rb', line 16 def scope @scope end |
#sub_assembly ⇒ Object (readonly)
Returns the value of attribute sub_assembly.
16 17 18 |
# File 'lib/cascading/sub_assembly.rb', line 16 def sub_assembly @sub_assembly end |
#tail_pipe ⇒ Object (readonly)
Returns the value of attribute tail_pipe.
16 17 18 |
# File 'lib/cascading/sub_assembly.rb', line 16 def tail_pipe @tail_pipe end |
Instance Method Details
#finalize(pipes, incoming_scopes) ⇒ Object
29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 |
# File 'lib/cascading/sub_assembly.rb', line 29 def finalize(pipes, incoming_scopes) # Build adjacency list for sub_assembly graph = {} adjacency_list(pipes, sub_assembly.tails.first, graph) # Group adjacency list by next_pipe incoming_edges = graph.inject({}) do |incoming_edges, (prev_pipe, next_pipe)| incoming_edges[next_pipe] ||= [] incoming_edges[next_pipe] << prev_pipe incoming_edges end # Propagate scope through sub_assembly graph inputs = Hash[*pipes.zip(incoming_scopes).flatten] while !incoming_edges.empty? incoming_edges.each do |next_pipe, prev_pipes| if (prev_pipes - inputs.keys).empty? input_scopes = prev_pipes.inject([]) do |input_scopes, prev_pipe| input_scopes << inputs.delete(prev_pipe) input_scopes end inputs[next_pipe] = Scope.outgoing_scope(next_pipe, input_scopes) incoming_edges.delete(next_pipe) end end end raise "Incoming edges did not capture all inputs; #{inputs.size} remaining" unless inputs.size == 1 @tail_pipe, @scope = inputs.first raise "Expected scope propagation to end with tail pipe; ended with '#{@tail_pipe}' instead" unless sub_assembly.tails.first == @tail_pipe # This is the same "fix" applied to our field name metadata after a # sequence of Everies in Aggregations. It just so happens that all of # CountBy, SumBy, and AverageBy end with Everies. However, it appears to # only be necessary for AverageBy (which has different declaredFields for # its partials than its final, unlike the other two). It would be nice # to track this issue down so that we can remove this hack from here and # Aggregations#finalize. discard_each = Java::CascadingPipe::Each.new(tail_pipe, all_fields, Java::CascadingOperation::Identity.new) @scope = Scope.outgoing_scope(discard_each, [scope]) [@tail_pipe, @scope] end |