Class: Cascading::Assembly
- Defined in:
- lib/cascading/assembly.rb
Overview
An Assembly is a sequence of Cascading pipes (Each, GroupBy, CoGroup, Every, and SubAssembly). This class will serve as your primary mechanism for doing work within a flow and contains all the functions and filters you will apply to a pipe (Eaches), as well as group_by, union, and join. For aggregators and buffers, please see Aggregations.
Function and filter DSL rules:
-
Use positional arguments for required parameters
-
Use options = {} for optional parameters
-
Use *args sparingly, specifically when you need to accept a varying length list of fields
-
If you require both a varying length list of fields and optional parameters, then see the Array#extract_options! extension
-
If you choose to name a required parameter, add it to options = {} and throw an exception if the caller does not provide it
-
If you have a require parameter that is provided by one of a set of options names, throw an exception if the caller does not provide at least one value (see :function and :filter in Assembly#each for an example)
Function and filter DSL standard optional parameter names:
- input
-
c.p.Each argument selector
- into
-
c.o.Operation field declaration
- output
-
c.p.Each output selector
A note on aliases: when a DSL method uniquely wraps a single Cascading operation, we attempt to provide an alias that matches the Cascading operation. However, Cascading operations are often nouns rather than verbs, and the latter are preferable for a dataflow DSL.
Instance Attribute Summary collapse
-
#head_pipe ⇒ Object
readonly
Returns the value of attribute head_pipe.
-
#tail_pipe ⇒ Object
readonly
Returns the value of attribute tail_pipe.
Attributes inherited from Node
#child_names, #children, #last_child, #name, #parent
Instance Method Summary collapse
-
#assert(assertion, options = {}) ⇒ Object
Builds an each assertion pipe given a c.o.a.Assertion and adds it to the current Assembly.
-
#assert_not_null(options = {}) ⇒ Object
Builes a pipe that asserts none of the fiels in the tuple are null.
-
#assert_size_equals(size, options = {}) ⇒ Object
Builds a pipe that asserts the size of the tuple is the specified size.
-
#branch(name, &block) ⇒ Object
Builds a child Assembly that branches this Assembly given a name and block.
-
#debug_scope ⇒ Object
Prints information about the scope of this Assembly at the point at which it is called.
-
#describe(offset = '') ⇒ Object
Produces a textual description of this Assembly.
-
#each(*args_with_options) ⇒ Object
Builds a basic each pipe and adds it to the current Assembly.
-
#group_by(*args_with_options, &block) ⇒ Object
Builds a new GroupBy pipe that groups on the fields given in args_with_options.
-
#hash_join(*args_with_options) ⇒ Object
Builds a HashJoin pipe.
-
#initialize(name, parent, outgoing_scopes = {}) ⇒ Assembly
constructor
Do not use this constructor directly; instead, use Flow#assembly or Assembly#branch to build assemblies.
-
#inner_join(*args_with_options, &block) ⇒ Object
Builds an inner join (CoGroup) pipe.
-
#join(*args_with_options, &block) ⇒ Object
(also: #co_group)
Builds a join (CoGroup) pipe.
-
#left_join(*args_with_options, &block) ⇒ Object
Builds a left join (CoGroup) pipe.
-
#outer_join(*args_with_options, &block) ⇒ Object
Builds an outer join (CoGroup) pipe.
-
#parent_flow ⇒ Object
Rather than the immediate parent, this method returns the parent flow of this Assembly.
-
#right_join(*args_with_options, &block) ⇒ Object
Builds a right join (CoGroup) pipe.
-
#scope ⇒ Object
Accesses the outgoing scope of this Assembly at the point at which it is called.
-
#sub_assembly(sub_assembly, pipes = [tail_pipe], incoming_scopes = [scope]) ⇒ Object
Allows you to plugin c.p.SubAssemblies to an Assembly under certain assumptions.
-
#to_s ⇒ Object
Prints detail about this Assembly including its name, head pipe, and tail pipe.
-
#union(*args_with_options, &block) ⇒ Object
(also: #union_pipes)
Unifies multiple incoming pipes sharing the same field structure using a GroupBy.
Methods included from TextOperations
#format_date, #join_fields, #parse_date
Methods included from RegexOperations
#match_rows, #parse, #replace, #split, #split_rows
Methods included from FilterOperations
#filter, #filter_not_null, #filter_null, #reject, #where
Methods included from IdentityOperations
#cast, #copy, #discard, #pass, #project, #rename
Methods included from Operations
#debug, #insert, #null_indicator, #regex_contains, #set_value, #ungroup
Methods inherited from Node
#add_child, #find_child, #qualified_name, #root
Constructor Details
#initialize(name, parent, outgoing_scopes = {}) ⇒ Assembly
Do not use this constructor directly; instead, use Flow#assembly or Assembly#branch to build assemblies.
Builds an Assembly given a name, parent, and optional outgoing_scopes (necessary only for branching).
An assembly’s name is quite important as it will determine:
-
The sources from which it will read, if any
-
The name to be used in joins or unions downstream
-
The name to be used to sink the output of the assembly downstream
48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 |
# File 'lib/cascading/assembly.rb', line 48 def initialize(name, parent, outgoing_scopes = {}) super(name, parent) @outgoing_scopes = outgoing_scopes if parent.kind_of?(Assembly) @head_pipe = Java::CascadingPipe::Pipe.new(name, parent.tail_pipe) # Copy to allow destructive update of name @outgoing_scopes[name] = parent.scope.copy scope.scope.name = name else # Parent is a Flow @head_pipe = Java::CascadingPipe::Pipe.new(name) @outgoing_scopes[name] ||= Scope.empty_scope(name) end @tail_pipe = head_pipe @incoming_scopes = [scope] end |
Instance Attribute Details
#head_pipe ⇒ Object (readonly)
Returns the value of attribute head_pipe.
36 37 38 |
# File 'lib/cascading/assembly.rb', line 36 def head_pipe @head_pipe end |
#tail_pipe ⇒ Object (readonly)
Returns the value of attribute tail_pipe.
36 37 38 |
# File 'lib/cascading/assembly.rb', line 36 def tail_pipe @tail_pipe end |
Instance Method Details
#assert(assertion, options = {}) ⇒ Object
Builds an each assertion pipe given a c.o.a.Assertion and adds it to the current Assembly.
The named options are:
- level
-
The assertion level; defaults to strict.
437 438 439 440 441 442 |
# File 'lib/cascading/assembly.rb', line 437 def assert(assertion, = {}) assertion_level = [:level] || Java::CascadingOperation::AssertionLevel::STRICT parameters = [tail_pipe, assertion_level, assertion] make_pipe(Java::CascadingPipe::Each, parameters) end |
#assert_not_null(options = {}) ⇒ Object
Builes a pipe that asserts none of the fiels in the tuple are null.
451 452 453 454 |
# File 'lib/cascading/assembly.rb', line 451 def assert_not_null( = {}) assertion = Java::CascadingOperationAssertion::AssertNotNull.new assert(assertion, ) end |
#assert_size_equals(size, options = {}) ⇒ Object
Builds a pipe that asserts the size of the tuple is the specified size.
445 446 447 448 |
# File 'lib/cascading/assembly.rb', line 445 def assert_size_equals(size, = {}) assertion = Java::CascadingOperationAssertion::AssertSizeEquals.new(size) assert(assertion, ) end |
#branch(name, &block) ⇒ Object
Builds a child Assembly that branches this Assembly given a name and block.
An assembly’s name is quite important as it will determine:
-
The sources from which it will read, if any
-
The name to be used in joins or unions downstream
-
The name to be used to sink the output of the assembly downstream
Many branches may be built within an assembly. The result of a branch is the same as the Flow#assembly constructor, an Assembly object.
Example:
assembly 'some_work' do
...
branch 'more_work' do
...
end
branch 'yet_more_work' do
...
end
end
303 304 305 306 307 308 309 |
# File 'lib/cascading/assembly.rb', line 303 def branch(name, &block) raise "Could not build branch '#{name}'; block required" unless block_given? assembly = Assembly.new(name, self, @outgoing_scopes) add_child(assembly) assembly.instance_eval(&block) assembly end |
#debug_scope ⇒ Object
Prints information about the scope of this Assembly at the point at which it is called. This allows you to trace the propagation of field names through your job and is handy for debugging. See Scope for details.
96 97 98 |
# File 'lib/cascading/assembly.rb', line 96 def debug_scope puts "Current scope for '#{name}':\n #{scope}\n----------\n" end |
#describe(offset = '') ⇒ Object
Produces a textual description of this Assembly. The description details the structure of the Assembly, its input and output fields and any children (branches). The offset parameter allows for this describe to be nested within a calling context, which lets us indent the structural hierarchy of a job.
70 71 72 73 74 75 76 |
# File 'lib/cascading/assembly.rb', line 70 def describe(offset = '') incoming_scopes_desc = "#{@incoming_scopes.map{ |incoming_scope| incoming_scope.values_fields.to_a.inspect }.join(', ')}" incoming_scopes_desc = "(#{incoming_scopes_desc})" unless @incoming_scopes.size == 1 description = "#{offset}#{name}:assembly :: #{incoming_scopes_desc} -> #{scope.values_fields.to_a.inspect}" description += "\n#{child_names.map{ |child| children[child].describe("#{offset} ") }.join("\n")}" unless children.empty? description end |
#each(*args_with_options) ⇒ Object
Builds a basic each pipe and adds it to the current Assembly. Default arguments are all_fields, a default inherited from c.o.Each. Exactly one of :function and :filter must be specified and filters do not support an :output selector.
The named options are:
- filter
-
A Cascading Filter, mutually exclusive with :function.
- function
-
A Cascading Function, mutually exclusive with :filter.
- output
-
c.p.Each output selector, only valid with :function.
Example:
each fields(input_fields), :function => Java::CascadingOperation::Identity.new
each 'field1', 'field2', :function => Java::CascadingOperation::Identity.new
411 412 413 414 415 416 417 418 419 420 421 422 423 424 |
# File 'lib/cascading/assembly.rb', line 411 def each(*) , in_fields = ., fields() out_fields = fields([:output]) # Default Fields.RESULTS from c.o.Each operation = [:filter] || [:function] raise 'each requires either :filter or :function' unless operation raise 'c.p.Each does not support applying an output selector to a c.o.Filter' if [:filter] && [:output] parameters = [tail_pipe, in_fields, operation, out_fields].compact each = make_pipe(Java::CascadingPipe::Each, parameters) raise ':function specified but c.o.Filter provided' if [:function] && each.is_filter raise ':filter specified but c.o.Function provided' if [:filter] && each.is_function each end |
#group_by(*args_with_options, &block) ⇒ Object
Builds a new GroupBy pipe that groups on the fields given in args_with_options. The block passed to this method will be evaluated in the context of Aggregations, not Assembly.
The named options are:
- sort_by
-
Optional keys for within-group sort.
- reverse
-
Boolean that can reverse the order of within-group sorting (only makes sense given :sort_by keys).
Example:
assembly 'total' do
...
insert 'const' => 1
group_by 'const' do
count
sum 'val1', 'val2', :type => :long
end
discard 'const'
end
330 331 332 333 334 335 336 337 |
# File 'lib/cascading/assembly.rb', line 330 def group_by(*, &block) , group_fields = ., fields() sort_fields = fields([:sort_by]) reverse = [:reverse] parameters = [tail_pipe, group_fields, sort_fields, reverse].compact apply_aggregations(Java::CascadingPipe::GroupBy.new(*parameters), [scope], &block) end |
#hash_join(*args_with_options) ⇒ Object
Builds a HashJoin pipe. This should be used carefully, as the right side of the join is accumulated entirely in memory. Requires a list of assembly names to join and :on to specify the join_fields. Note that a hash_join “takes over” the Assembly in which it is built, so it is typically the first statement within the block of the assembly or branch. Additionally, a hash join does not accept a block for aggregations like other joins; this restriction is enforced here, but comes directly from Cascading.
The named options are:
- on
-
The keys of the join, an array of strings if they are the same in all inputs, or a hash mapping assembly names to key names if they differ across inputs.
- declared_fields
-
By default, a deduplicated array of incoming field names (see Cascading::dedup_fields). Specifies the names of the fields that will be available to aggregations or post-join if no aggregations are specified.
- joiner
-
A specification of the c.p.j.Joiner to use. Values like :inner and ‘inner’, :right and ‘right’ are accepted, as well as an array specifying mixed joins. Typically, this is not provided, but one of the higher level join methods on Assembly is used directly (like Assembly#inner_join or Assembly#right_join).
Example:
assembly 'join_left_right' do
hash_join 'left', 'right', :on => ['key1', 'key2'], :joiner => :inner
end
134 135 136 137 138 139 140 |
# File 'lib/cascading/assembly.rb', line 134 def hash_join(*) raise ArgumentError, "HashJoin doesn't support aggregations so the block provided to hash_join will be ignored" if block_given? , assembly_names = ., [:hash] = true prepare_join(assembly_names, ) end |
#inner_join(*args_with_options, &block) ⇒ Object
Builds an inner join (CoGroup) pipe. Requires a list of assembly names to join and :on to specify the group_fields.
The named options are:
- on
-
The keys of the join, an array of strings if they are the same in all inputs, or a hash mapping assembly names to key names if they differ across inputs.
- declared_fields
-
By default, a deduplicated array of incoming field names (see Cascading::dedup_fields). Specifies the names of the fields that will be available to aggregations or post-join if no aggregations are specified.
Example:
assembly 'join_left_right' do
inner_join 'left', 'right', :on => ['key1', 'key2']
sum 'val1', 'val2', :type => :long
end
end
195 196 197 198 199 200 |
# File 'lib/cascading/assembly.rb', line 195 def inner_join(*, &block) = . [:joiner] = :inner << join(*, &block) end |
#join(*args_with_options, &block) ⇒ Object Also known as: co_group
Builds a join (CoGroup) pipe. Requires a list of assembly names to join and :on to specify the group_fields. Note that a join “takes over” the Assembly in which it is built, so it is typically the first statement within the block of the assembly or branch. The block passed to this method will be evaluated in the context of Aggregations, not Assembly.
The named options are:
- on
-
The keys of the join, an array of strings if they are the same in all inputs, or a hash mapping assembly names to key names if they differ across inputs.
- declared_fields
-
By default, a deduplicated array of incoming field names (see Cascading::dedup_fields). Specifies the names of the fields that will be available to aggregations or post-join if no aggregations are specified.
- joiner
-
A specification of the c.p.j.Joiner to use. Values like :inner and ‘inner’, :right and ‘right’ are accepted, as well as an array specifying mixed joins. Typically, this is not provided, but one of the higher level join methods on Assembly is used directly (like Assembly#inner_join or Assembly#right_join).
Example:
assembly 'join_left_right' do
join 'left', 'right', :on => ['key1', 'key2'], :joiner => :inner do
sum 'val1', 'val2', :type => :long
end
end
169 170 171 172 173 |
# File 'lib/cascading/assembly.rb', line 169 def join(*, &block) , assembly_names = ., [:hash] = false prepare_join(assembly_names, , &block) end |
#left_join(*args_with_options, &block) ⇒ Object
Builds a left join (CoGroup) pipe. Requires a list of assembly names to join and :on to specify the group_fields.
The named options are:
- on
-
The keys of the join, an array of strings if they are the same in all inputs, or a hash mapping assembly names to key names if they differ across inputs.
- declared_fields
-
By default, a deduplicated array of incoming field names (see Cascading::dedup_fields). Specifies the names of the fields that will be available to aggregations or post-join if no aggregations are specified.
Example:
assembly 'join_left_right' do
left_join 'left', 'right', :on => ['key1', 'key2'] do
sum 'val1', 'val2', :type => :long
end
end
221 222 223 224 225 226 |
# File 'lib/cascading/assembly.rb', line 221 def left_join(*, &block) = . [:joiner] = :left << join(*, &block) end |
#outer_join(*args_with_options, &block) ⇒ Object
Builds an outer join (CoGroup) pipe. Requires a list of assembly names to join and :on to specify the group_fields.
The named options are:
- on
-
The keys of the join, an array of strings if they are the same in all inputs, or a hash mapping assembly names to key names if they differ across inputs.
- declared_fields
-
By default, a deduplicated array of incoming field names (see Cascading::dedup_fields). Specifies the names of the fields that will be available to aggregations or post-join if no aggregations are specified.
Example:
assembly 'join_left_right' do
outer_join 'left', 'right', :on => ['key1', 'key2'] do
sum 'val1', 'val2', :type => :long
end
end
273 274 275 276 277 278 |
# File 'lib/cascading/assembly.rb', line 273 def outer_join(*, &block) = . [:joiner] = :outer << join(*, &block) end |
#parent_flow ⇒ Object
Rather than the immediate parent, this method returns the parent flow of this Assembly. If this is a branch, we must traverse the parents of parent assemblies.
81 82 83 84 |
# File 'lib/cascading/assembly.rb', line 81 def parent_flow return parent if parent.kind_of?(Flow) parent.parent_flow end |
#right_join(*args_with_options, &block) ⇒ Object
Builds a right join (CoGroup) pipe. Requires a list of assembly names to join and :on to specify the group_fields.
The named options are:
- on
-
The keys of the join, an array of strings if they are the same in all inputs, or a hash mapping assembly names to key names if they differ across inputs.
- declared_fields
-
By default, a deduplicated array of incoming field names (see Cascading::dedup_fields). Specifies the names of the fields that will be available to aggregations or post-join if no aggregations are specified.
Example:
assembly 'join_left_right' do
right_join 'left', 'right', :on => ['key1', 'key2'] do
sum 'val1', 'val2', :type => :long
end
end
247 248 249 250 251 252 |
# File 'lib/cascading/assembly.rb', line 247 def right_join(*, &block) = . [:joiner] = :right << join(*, &block) end |
#scope ⇒ Object
Accesses the outgoing scope of this Assembly at the point at which it is called. This is useful for grabbing the values_fields at any point in the construction of the Assembly. See Scope for details.
89 90 91 |
# File 'lib/cascading/assembly.rb', line 89 def scope @outgoing_scopes[name] end |
#sub_assembly(sub_assembly, pipes = [tail_pipe], incoming_scopes = [scope]) ⇒ Object
Allows you to plugin c.p.SubAssemblies to an Assembly under certain assumptions. Note the default is to extend the tail pipe of this Assembly using a linear SubAssembly. See SubAssembly class for details.
Example:
assembly 'id_rows' do
...
sub_assembly Java::CascadingPipeAssembly::Discard.new(tail_pipe, fields('id'))
end
388 389 390 391 392 393 394 395 396 |
# File 'lib/cascading/assembly.rb', line 388 def sub_assembly(sub_assembly, pipes = [tail_pipe], incoming_scopes = [scope]) sub_assembly = SubAssembly.new(self, sub_assembly) sub_assembly.finalize(pipes, incoming_scopes) @tail_pipe = sub_assembly.tail_pipe @outgoing_scopes[name] = sub_assembly.scope sub_assembly end |
#to_s ⇒ Object
Prints detail about this Assembly including its name, head pipe, and tail pipe.
102 103 104 |
# File 'lib/cascading/assembly.rb', line 102 def to_s "#{name} : head pipe : #{head_pipe} - tail pipe: #{tail_pipe}" end |
#union(*args_with_options, &block) ⇒ Object Also known as: union_pipes
Unifies multiple incoming pipes sharing the same field structure using a GroupBy. Accepts :on like join and :sort_by and :reverse like group_by, as well as a block which may be used for a sequence of Every aggregations. The block passed to this method will be evaluated in the context of Aggregations, not Assembly.
By default, groups only on the first field (see line 189 of GroupBy.java)
The named options are:
- on
-
The keys of the union, which defaults to the first field in the first input assembly.
- sort_by
-
Optional keys for sorting.
- reverse
-
Boolean that can reverse the order of sorting (only makes sense given :sort_by keys).
Example:
assembly 'union_left_right' do
union 'left', 'right' do
sum 'val1', 'val2', :type => :long
end
end
360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 |
# File 'lib/cascading/assembly.rb', line 360 def union(*, &block) , assembly_names = ., group_fields = fields([:on]) sort_fields = fields([:sort_by]) reverse = [:reverse] pipes, _ = populate_incoming_scopes(assembly_names) # Must provide group_fields to ensure field name propagation group_fields = fields(@incoming_scopes.first.values_fields.get(0)) unless group_fields # FIXME: GroupBy is missing a constructor for union in wip-255 sort_fields = group_fields if !sort_fields && !reverse.nil? parameters = [pipes.to_java(Java::CascadingPipe::Pipe), group_fields, sort_fields, reverse].compact apply_aggregations(Java::CascadingPipe::GroupBy.new(*parameters), @incoming_scopes, &block) end |