Class: Cascading::SubAssembly

Inherits:
Object
  • Object
show all
Defined in:
lib/cascading/sub_assembly.rb

Overview

Allows you to plugin c.p.SubAssemblies to a cascading.jruby Assembly.

Assumptions:

  • You will either use the tail_pipe of the calling Assembly, or overwrite

its incoming_scopes (as do join and union)

  • Your subassembly will have only 1 tail pipe; branching is not

supported. This allows you to continue operating upon the tail of the SubAssembly within the calling Assembly

  • You will not use nested c.p.SubAssemblies

This is a low-level tool, so be careful.

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(assembly, sub_assembly) ⇒ SubAssembly

Returns a new instance of SubAssembly.



18
19
20
21
22
23
24
25
26
27
# File 'lib/cascading/sub_assembly.rb', line 18

def initialize(assembly, sub_assembly)
  @assembly = assembly
  @sub_assembly = sub_assembly
  @tail_pipe = assembly.tail_pipe
  @scope = assembly.scope

  # Enforces 1 tail pipe assumption
  raise 'SubAssembly must call setTails in constructor' unless sub_assembly.tails
  raise 'SubAssembly must set exactly 1 tail in constructor' unless sub_assembly.tails.size == 1
end

Instance Attribute Details

#assemblyObject (readonly)

Returns the value of attribute assembly.



16
17
18
# File 'lib/cascading/sub_assembly.rb', line 16

def assembly
  @assembly
end

#scopeObject (readonly)

Returns the value of attribute scope.



16
17
18
# File 'lib/cascading/sub_assembly.rb', line 16

def scope
  @scope
end

#sub_assemblyObject (readonly)

Returns the value of attribute sub_assembly.



16
17
18
# File 'lib/cascading/sub_assembly.rb', line 16

def sub_assembly
  @sub_assembly
end

#tail_pipeObject (readonly)

Returns the value of attribute tail_pipe.



16
17
18
# File 'lib/cascading/sub_assembly.rb', line 16

def tail_pipe
  @tail_pipe
end

Instance Method Details

#finalize(pipes, incoming_scopes) ⇒ Object



29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
# File 'lib/cascading/sub_assembly.rb', line 29

def finalize(pipes, incoming_scopes)
  # Build adjacency list for sub_assembly
  graph = {}
  adjacency_list(pipes, sub_assembly.tails.first, graph)

  # Group adjacency list by next_pipe
  incoming_edges = graph.inject({}) do |incoming_edges, (prev_pipe, next_pipe)|
    incoming_edges[next_pipe] ||= []
    incoming_edges[next_pipe] << prev_pipe
    incoming_edges
  end

  # Propagate scope through sub_assembly graph
  inputs = Hash[*pipes.zip(incoming_scopes).flatten]
  while !incoming_edges.empty?
    incoming_edges.each do |next_pipe, prev_pipes|
      if (prev_pipes - inputs.keys).empty?
        input_scopes = prev_pipes.inject([]) do |input_scopes, prev_pipe|
          input_scopes << inputs.delete(prev_pipe)
          input_scopes
        end
        inputs[next_pipe] = Scope.outgoing_scope(next_pipe, input_scopes)
        incoming_edges.delete(next_pipe)
      end
    end
  end

  raise "Incoming edges did not capture all inputs; #{inputs.size} remaining" unless inputs.size == 1
  @tail_pipe, @scope = inputs.first
  raise "Expected scope propagation to end with tail pipe; ended with '#{@tail_pipe}' instead" unless sub_assembly.tails.first == @tail_pipe

  # This is the same "fix" applied to our field name metadata after a
  # sequence of Everies in Aggregations.  It just so happens that all of
  # CountBy, SumBy, and AverageBy end with Everies.  However, it appears to
  # only be necessary for AverageBy (which has different declaredFields for
  # its partials than its final, unlike the other two).  It would be nice
  # to track this issue down so that we can remove this hack from here and
  # Aggregations#finalize.
  discard_each = Java::CascadingPipe::Each.new(tail_pipe, all_fields, Java::CascadingOperation::Identity.new)
  @scope = Scope.outgoing_scope(discard_each, [scope])

  [@tail_pipe, @scope]
end