Class: Cascading::SubAssembly

Inherits:
Object
  • Object
show all
Defined in:
lib/cascading/sub_assembly.rb

Overview

Allows you to plugin c.p.SubAssemblies to a cascading.jruby Assembly.

Assumptions:

  • You will either use the tail_pipe of the calling Assembly, or overwrite its incoming_scopes (as do join and union)

  • Your subassembly will have only 1 tail pipe; branching is not supported. This allows you to continue operating upon the tail of the SubAssembly within the calling Assembly

  • You will not use nested c.p.SubAssemblies

This is a low-level tool, so be careful.

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(assembly, sub_assembly) ⇒ SubAssembly

Construct a SubAssembly within the given Assembly



16
17
18
19
20
21
22
23
24
25
# File 'lib/cascading/sub_assembly.rb', line 16

def initialize(assembly, sub_assembly)
  @assembly = assembly
  @sub_assembly = sub_assembly
  @tail_pipe = assembly.tail_pipe
  @scope = assembly.scope

  # Enforces 1 tail pipe assumption
  raise 'SubAssembly must call setTails in constructor' unless sub_assembly.tails
  raise 'SubAssembly must set exactly 1 tail in constructor' unless sub_assembly.tails.size == 1
end

Instance Attribute Details

#assemblyObject (readonly)

Returns the value of attribute assembly.



13
14
15
# File 'lib/cascading/sub_assembly.rb', line 13

def assembly
  @assembly
end

#scopeObject (readonly)

Returns the value of attribute scope.



13
14
15
# File 'lib/cascading/sub_assembly.rb', line 13

def scope
  @scope
end

#sub_assemblyObject (readonly)

Returns the value of attribute sub_assembly.



13
14
15
# File 'lib/cascading/sub_assembly.rb', line 13

def sub_assembly
  @sub_assembly
end

#tail_pipeObject (readonly)

Returns the value of attribute tail_pipe.



13
14
15
# File 'lib/cascading/sub_assembly.rb', line 13

def tail_pipe
  @tail_pipe
end

Instance Method Details

#finalize(pipes, incoming_scopes) ⇒ Object

Complete the addition of the SubAssembly to the Assembly. Propagates Scope through the SubAssembly and updates the tail_pipe of the SubAssembly for passing back to the enclosing Assembly. May accept many incoming pipes, but typically only recieves the tail_pipe of the enclosing Assembly.



32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
# File 'lib/cascading/sub_assembly.rb', line 32

def finalize(pipes, incoming_scopes)
  # Build adjacency list for sub_assembly
  graph = {}
  adjacency_list(pipes, sub_assembly.tails.first, graph)

  # Group adjacency list by next_pipe
  incoming_edges = graph.inject({}) do |incoming_edges, (prev_pipe, next_pipe)|
    incoming_edges[next_pipe] ||= []
    incoming_edges[next_pipe] << prev_pipe
    incoming_edges
  end

  # Propagate scope through sub_assembly graph
  inputs = Hash[*pipes.zip(incoming_scopes).flatten]
  while !incoming_edges.empty?
    incoming_edges.each do |next_pipe, prev_pipes|
      if (prev_pipes - inputs.keys).empty?
        input_scopes = prev_pipes.inject([]) do |input_scopes, prev_pipe|
          input_scopes << inputs.delete(prev_pipe)
          input_scopes
        end
        inputs[next_pipe] = Scope.outgoing_scope(next_pipe, input_scopes)
        incoming_edges.delete(next_pipe)
      end
    end
  end

  raise "Incoming edges did not capture all inputs; #{inputs.size} remaining" unless inputs.size == 1
  @tail_pipe, @scope = inputs.first
  raise "Expected scope propagation to end with tail pipe; ended with '#{@tail_pipe}' instead" unless sub_assembly.tails.first == @tail_pipe

  # This is the same "fix" applied to our field name metadata after a
  # sequence of Everies in Aggregations.  It just so happens that all of
  # CountBy, SumBy, and AverageBy end with Everies.  However, it appears to
  # only be necessary for AverageBy (which has different declaredFields for
  # its partials than its final, unlike the other two).  It would be nice
  # to track this issue down so that we can remove this hack from here and
  # Aggregations#finalize.
  discard_each = Java::CascadingPipe::Each.new(tail_pipe, all_fields, Java::CascadingOperation::Identity.new)
  @scope = Scope.outgoing_scope(discard_each, [scope])

  [@tail_pipe, @scope]
end