Class: Cascading::Assembly

Inherits:
Node
  • Object
show all
Includes:
FilterOperations, IdentityOperations, Operations, RegexOperations, TextOperations
Defined in:
lib/cascading/assembly.rb

Overview

An Assembly is a sequence of Cascading pipes (Each, GroupBy, CoGroup, Every, and SubAssembly). This class will serve as your primary mechanism for doing work within a flow and contains all the functions and filters you will apply to a pipe (Eaches), as well as group_by, union, and join. For aggregators and buffers, please see Aggregations.

Function and filter DSL rules:

  • Use positional arguments for required parameters

  • Use options = {} for optional parameters

  • Use *args sparingly, specifically when you need to accept a varying length list of fields

  • If you require both a varying length list of fields and optional parameters, then see the Array#extract_options! extension

  • If you choose to name a required parameter, add it to options = {} and throw an exception if the caller does not provide it

  • If you have a require parameter that is provided by one of a set of options names, throw an exception if the caller does not provide at least one value (see :function and :filter in Assembly#each for an example)

Function and filter DSL standard optional parameter names:

input

c.p.Each argument selector

into

c.o.Operation field declaration

output

c.p.Each output selector

A note on aliases: when a DSL method uniquely wraps a single Cascading operation, we attempt to provide an alias that matches the Cascading operation. However, Cascading operations are often nouns rather than verbs, and the latter are preferable for a dataflow DSL.

Instance Attribute Summary collapse

Attributes inherited from Node

#child_names, #children, #last_child, #name, #parent

Instance Method Summary collapse

Methods included from TextOperations

#format_date, #join_fields, #parse_date

Methods included from RegexOperations

#match_rows, #parse, #replace, #split, #split_rows

Methods included from FilterOperations

#filter, #filter_not_null, #filter_null, #reject, #where

Methods included from IdentityOperations

#cast, #copy, #discard, #pass, #project, #rename

Methods included from Operations

#debug, #insert, #null_indicator, #regex_contains, #set_value, #ungroup

Methods inherited from Node

#add_child, #find_child, #qualified_name, #root

Constructor Details

#initialize(name, parent, outgoing_scopes = {}) ⇒ Assembly

Do not use this constructor directly; instead, use Flow#assembly or Assembly#branch to build assemblies.

Builds an Assembly given a name, parent, and optional outgoing_scopes (necessary only for branching).

An assembly’s name is quite important as it will determine:

  • The sources from which it will read, if any

  • The name to be used in joins or unions downstream

  • The name to be used to sink the output of the assembly downstream



48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
# File 'lib/cascading/assembly.rb', line 48

def initialize(name, parent, outgoing_scopes = {})
  super(name, parent)

  @outgoing_scopes = outgoing_scopes
  if parent.kind_of?(Assembly)
    @head_pipe = Java::CascadingPipe::Pipe.new(name, parent.tail_pipe)
    # Copy to allow destructive update of name
    @outgoing_scopes[name] = parent.scope.copy
    scope.scope.name = name
  else # Parent is a Flow
    @head_pipe = Java::CascadingPipe::Pipe.new(name)
    @outgoing_scopes[name] ||= Scope.empty_scope(name)
  end
  @tail_pipe = head_pipe
  @incoming_scopes = [scope]
end

Instance Attribute Details

#head_pipeObject (readonly)

Returns the value of attribute head_pipe.



36
37
38
# File 'lib/cascading/assembly.rb', line 36

def head_pipe
  @head_pipe
end

#tail_pipeObject (readonly)

Returns the value of attribute tail_pipe.



36
37
38
# File 'lib/cascading/assembly.rb', line 36

def tail_pipe
  @tail_pipe
end

Instance Method Details

#assert(assertion, options = {}) ⇒ Object

Builds an each assertion pipe given a c.o.a.Assertion and adds it to the current Assembly.

The named options are:

level

The assertion level; defaults to strict.



437
438
439
440
441
442
# File 'lib/cascading/assembly.rb', line 437

def assert(assertion, options = {})
  assertion_level = options[:level] || Java::CascadingOperation::AssertionLevel::STRICT

  parameters = [tail_pipe, assertion_level, assertion]
  make_pipe(Java::CascadingPipe::Each, parameters)
end

#assert_not_null(options = {}) ⇒ Object

Builes a pipe that asserts none of the fiels in the tuple are null.



451
452
453
454
# File 'lib/cascading/assembly.rb', line 451

def assert_not_null(options = {})
  assertion = Java::CascadingOperationAssertion::AssertNotNull.new
  assert(assertion, options)
end

#assert_size_equals(size, options = {}) ⇒ Object

Builds a pipe that asserts the size of the tuple is the specified size.



445
446
447
448
# File 'lib/cascading/assembly.rb', line 445

def assert_size_equals(size, options = {})
  assertion = Java::CascadingOperationAssertion::AssertSizeEquals.new(size)
  assert(assertion, options)
end

#branch(name, &block) ⇒ Object

Builds a child Assembly that branches this Assembly given a name and block.

An assembly’s name is quite important as it will determine:

  • The sources from which it will read, if any

  • The name to be used in joins or unions downstream

  • The name to be used to sink the output of the assembly downstream

Many branches may be built within an assembly. The result of a branch is the same as the Flow#assembly constructor, an Assembly object.

Example:

assembly 'some_work' do
  ...

  branch 'more_work' do
    ...
  end

  branch 'yet_more_work' do
    ...
  end
end


303
304
305
306
307
308
309
# File 'lib/cascading/assembly.rb', line 303

def branch(name, &block)
  raise "Could not build branch '#{name}'; block required" unless block_given?
  assembly = Assembly.new(name, self, @outgoing_scopes)
  add_child(assembly)
  assembly.instance_eval(&block)
  assembly
end

#debug_scopeObject

Prints information about the scope of this Assembly at the point at which it is called. This allows you to trace the propagation of field names through your job and is handy for debugging. See Scope for details.



96
97
98
# File 'lib/cascading/assembly.rb', line 96

def debug_scope
  puts "Current scope for '#{name}':\n  #{scope}\n----------\n"
end

#describe(offset = '') ⇒ Object

Produces a textual description of this Assembly. The description details the structure of the Assembly, its input and output fields and any children (branches). The offset parameter allows for this describe to be nested within a calling context, which lets us indent the structural hierarchy of a job.



70
71
72
73
74
75
76
# File 'lib/cascading/assembly.rb', line 70

def describe(offset = '')
  incoming_scopes_desc = "#{@incoming_scopes.map{ |incoming_scope| incoming_scope.values_fields.to_a.inspect }.join(', ')}"
  incoming_scopes_desc = "(#{incoming_scopes_desc})" unless @incoming_scopes.size == 1
  description =  "#{offset}#{name}:assembly :: #{incoming_scopes_desc} -> #{scope.values_fields.to_a.inspect}"
  description += "\n#{child_names.map{ |child| children[child].describe("#{offset}  ") }.join("\n")}" unless children.empty?
  description
end

#each(*args_with_options) ⇒ Object

Builds a basic each pipe and adds it to the current Assembly. Default arguments are all_fields, a default inherited from c.o.Each. Exactly one of :function and :filter must be specified and filters do not support an :output selector.

The named options are:

filter

A Cascading Filter, mutually exclusive with :function.

function

A Cascading Function, mutually exclusive with :filter.

output

c.p.Each output selector, only valid with :function.

Example:

each fields(input_fields), :function => Java::CascadingOperation::Identity.new
each 'field1', 'field2', :function => Java::CascadingOperation::Identity.new


411
412
413
414
415
416
417
418
419
420
421
422
423
424
# File 'lib/cascading/assembly.rb', line 411

def each(*args_with_options)
  options, in_fields = args_with_options.extract_options!, fields(args_with_options)
  out_fields = fields(options[:output]) # Default Fields.RESULTS from c.o.Each
  operation = options[:filter] || options[:function]
  raise 'each requires either :filter or :function' unless operation
  raise 'c.p.Each does not support applying an output selector to a c.o.Filter' if options[:filter] && options[:output]

  parameters = [tail_pipe, in_fields, operation, out_fields].compact
  each = make_pipe(Java::CascadingPipe::Each, parameters)
  raise ':function specified but c.o.Filter provided' if options[:function] && each.is_filter
  raise ':filter specified but c.o.Function provided' if options[:filter] && each.is_function

  each
end

#group_by(*args_with_options, &block) ⇒ Object

Builds a new GroupBy pipe that groups on the fields given in args_with_options. The block passed to this method will be evaluated in the context of Aggregations, not Assembly.

The named options are:

sort_by

Optional keys for within-group sort.

reverse

Boolean that can reverse the order of within-group sorting (only makes sense given :sort_by keys).

Example:

assembly 'total' do
  ...
  insert 'const' => 1
  group_by 'const' do
    count
    sum 'val1', 'val2', :type => :long
  end
  discard 'const'
end


330
331
332
333
334
335
336
337
# File 'lib/cascading/assembly.rb', line 330

def group_by(*args_with_options, &block)
  options, group_fields = args_with_options.extract_options!, fields(args_with_options)
  sort_fields = fields(options[:sort_by])
  reverse = options[:reverse]

  parameters = [tail_pipe, group_fields, sort_fields, reverse].compact
  apply_aggregations(Java::CascadingPipe::GroupBy.new(*parameters), [scope], &block)
end

#hash_join(*args_with_options) ⇒ Object

Builds a HashJoin pipe. This should be used carefully, as the right side of the join is accumulated entirely in memory. Requires a list of assembly names to join and :on to specify the join_fields. Note that a hash_join “takes over” the Assembly in which it is built, so it is typically the first statement within the block of the assembly or branch. Additionally, a hash join does not accept a block for aggregations like other joins; this restriction is enforced here, but comes directly from Cascading.

The named options are:

on

The keys of the join, an array of strings if they are the same in all inputs, or a hash mapping assembly names to key names if they differ across inputs.

declared_fields

By default, a deduplicated array of incoming field names (see Cascading::dedup_fields). Specifies the names of the fields that will be available to aggregations or post-join if no aggregations are specified.

joiner

A specification of the c.p.j.Joiner to use. Values like :inner and ‘inner’, :right and ‘right’ are accepted, as well as an array specifying mixed joins. Typically, this is not provided, but one of the higher level join methods on Assembly is used directly (like Assembly#inner_join or Assembly#right_join).

Example:

assembly 'join_left_right' do
  hash_join 'left', 'right', :on => ['key1', 'key2'], :joiner => :inner
end

Raises:

  • (ArgumentError)


134
135
136
137
138
139
140
# File 'lib/cascading/assembly.rb', line 134

def hash_join(*args_with_options)
  raise ArgumentError, "HashJoin doesn't support aggregations so the block provided to hash_join will be ignored" if block_given?

  options, assembly_names = args_with_options.extract_options!, args_with_options
  options[:hash] = true
  prepare_join(assembly_names, options)
end

#inner_join(*args_with_options, &block) ⇒ Object

Builds an inner join (CoGroup) pipe. Requires a list of assembly names to join and :on to specify the group_fields.

The named options are:

on

The keys of the join, an array of strings if they are the same in all inputs, or a hash mapping assembly names to key names if they differ across inputs.

declared_fields

By default, a deduplicated array of incoming field names (see Cascading::dedup_fields). Specifies the names of the fields that will be available to aggregations or post-join if no aggregations are specified.

Example:

assembly 'join_left_right' do
  inner_join 'left', 'right', :on => ['key1', 'key2']
    sum 'val1', 'val2', :type => :long
  end
end


195
196
197
198
199
200
# File 'lib/cascading/assembly.rb', line 195

def inner_join(*args_with_options, &block)
  options = args_with_options.extract_options!
  options[:joiner] = :inner
  args_with_options << options
  join(*args_with_options, &block)
end

#join(*args_with_options, &block) ⇒ Object Also known as: co_group

Builds a join (CoGroup) pipe. Requires a list of assembly names to join and :on to specify the group_fields. Note that a join “takes over” the Assembly in which it is built, so it is typically the first statement within the block of the assembly or branch. The block passed to this method will be evaluated in the context of Aggregations, not Assembly.

The named options are:

on

The keys of the join, an array of strings if they are the same in all inputs, or a hash mapping assembly names to key names if they differ across inputs.

declared_fields

By default, a deduplicated array of incoming field names (see Cascading::dedup_fields). Specifies the names of the fields that will be available to aggregations or post-join if no aggregations are specified.

joiner

A specification of the c.p.j.Joiner to use. Values like :inner and ‘inner’, :right and ‘right’ are accepted, as well as an array specifying mixed joins. Typically, this is not provided, but one of the higher level join methods on Assembly is used directly (like Assembly#inner_join or Assembly#right_join).

Example:

assembly 'join_left_right' do
  join 'left', 'right', :on => ['key1', 'key2'], :joiner => :inner do
    sum 'val1', 'val2', :type => :long
  end
end


169
170
171
172
173
# File 'lib/cascading/assembly.rb', line 169

def join(*args_with_options, &block)
  options, assembly_names = args_with_options.extract_options!, args_with_options
  options[:hash] = false
  prepare_join(assembly_names, options, &block)
end

#left_join(*args_with_options, &block) ⇒ Object

Builds a left join (CoGroup) pipe. Requires a list of assembly names to join and :on to specify the group_fields.

The named options are:

on

The keys of the join, an array of strings if they are the same in all inputs, or a hash mapping assembly names to key names if they differ across inputs.

declared_fields

By default, a deduplicated array of incoming field names (see Cascading::dedup_fields). Specifies the names of the fields that will be available to aggregations or post-join if no aggregations are specified.

Example:

assembly 'join_left_right' do
  left_join 'left', 'right', :on => ['key1', 'key2'] do
    sum 'val1', 'val2', :type => :long
  end
end


221
222
223
224
225
226
# File 'lib/cascading/assembly.rb', line 221

def left_join(*args_with_options, &block)
  options = args_with_options.extract_options!
  options[:joiner] = :left
  args_with_options << options
  join(*args_with_options, &block)
end

#outer_join(*args_with_options, &block) ⇒ Object

Builds an outer join (CoGroup) pipe. Requires a list of assembly names to join and :on to specify the group_fields.

The named options are:

on

The keys of the join, an array of strings if they are the same in all inputs, or a hash mapping assembly names to key names if they differ across inputs.

declared_fields

By default, a deduplicated array of incoming field names (see Cascading::dedup_fields). Specifies the names of the fields that will be available to aggregations or post-join if no aggregations are specified.

Example:

assembly 'join_left_right' do
  outer_join 'left', 'right', :on => ['key1', 'key2'] do
    sum 'val1', 'val2', :type => :long
  end
end


273
274
275
276
277
278
# File 'lib/cascading/assembly.rb', line 273

def outer_join(*args_with_options, &block)
  options = args_with_options.extract_options!
  options[:joiner] = :outer
  args_with_options << options
  join(*args_with_options, &block)
end

#parent_flowObject

Rather than the immediate parent, this method returns the parent flow of this Assembly. If this is a branch, we must traverse the parents of parent assemblies.



81
82
83
84
# File 'lib/cascading/assembly.rb', line 81

def parent_flow
  return parent if parent.kind_of?(Flow)
  parent.parent_flow
end

#right_join(*args_with_options, &block) ⇒ Object

Builds a right join (CoGroup) pipe. Requires a list of assembly names to join and :on to specify the group_fields.

The named options are:

on

The keys of the join, an array of strings if they are the same in all inputs, or a hash mapping assembly names to key names if they differ across inputs.

declared_fields

By default, a deduplicated array of incoming field names (see Cascading::dedup_fields). Specifies the names of the fields that will be available to aggregations or post-join if no aggregations are specified.

Example:

assembly 'join_left_right' do
  right_join 'left', 'right', :on => ['key1', 'key2'] do
    sum 'val1', 'val2', :type => :long
  end
end


247
248
249
250
251
252
# File 'lib/cascading/assembly.rb', line 247

def right_join(*args_with_options, &block)
  options = args_with_options.extract_options!
  options[:joiner] = :right
  args_with_options << options
  join(*args_with_options, &block)
end

#scopeObject

Accesses the outgoing scope of this Assembly at the point at which it is called. This is useful for grabbing the values_fields at any point in the construction of the Assembly. See Scope for details.



89
90
91
# File 'lib/cascading/assembly.rb', line 89

def scope
  @outgoing_scopes[name]
end

#sub_assembly(sub_assembly, pipes = [tail_pipe], incoming_scopes = [scope]) ⇒ Object

Allows you to plugin c.p.SubAssemblies to an Assembly under certain assumptions. Note the default is to extend the tail pipe of this Assembly using a linear SubAssembly. See SubAssembly class for details.

Example:

assembly 'id_rows' do
  ...
  sub_assembly Java::CascadingPipeAssembly::Discard.new(tail_pipe, fields('id'))
end


388
389
390
391
392
393
394
395
396
# File 'lib/cascading/assembly.rb', line 388

def sub_assembly(sub_assembly, pipes = [tail_pipe], incoming_scopes = [scope])
  sub_assembly = SubAssembly.new(self, sub_assembly)
  sub_assembly.finalize(pipes, incoming_scopes)

  @tail_pipe = sub_assembly.tail_pipe
  @outgoing_scopes[name] = sub_assembly.scope

  sub_assembly
end

#to_sObject

Prints detail about this Assembly including its name, head pipe, and tail pipe.



102
103
104
# File 'lib/cascading/assembly.rb', line 102

def to_s
  "#{name} : head pipe : #{head_pipe} - tail pipe: #{tail_pipe}"
end

#union(*args_with_options, &block) ⇒ Object Also known as: union_pipes

Unifies multiple incoming pipes sharing the same field structure using a GroupBy. Accepts :on like join and :sort_by and :reverse like group_by, as well as a block which may be used for a sequence of Every aggregations. The block passed to this method will be evaluated in the context of Aggregations, not Assembly.

By default, groups only on the first field (see line 189 of GroupBy.java)

The named options are:

on

The keys of the union, which defaults to the first field in the first input assembly.

sort_by

Optional keys for sorting.

reverse

Boolean that can reverse the order of sorting (only makes sense given :sort_by keys).

Example:

assembly 'union_left_right' do
  union 'left', 'right' do
    sum 'val1', 'val2', :type => :long
  end
end


360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
# File 'lib/cascading/assembly.rb', line 360

def union(*args_with_options, &block)
  options, assembly_names = args_with_options.extract_options!, args_with_options
  group_fields = fields(options[:on])
  sort_fields = fields(options[:sort_by])
  reverse = options[:reverse]

  pipes, _ = populate_incoming_scopes(assembly_names)

  # Must provide group_fields to ensure field name propagation
  group_fields = fields(@incoming_scopes.first.values_fields.get(0)) unless group_fields

  # FIXME: GroupBy is missing a constructor for union in wip-255
  sort_fields = group_fields if !sort_fields && !reverse.nil?

  parameters = [pipes.to_java(Java::CascadingPipe::Pipe), group_fields, sort_fields, reverse].compact
  apply_aggregations(Java::CascadingPipe::GroupBy.new(*parameters), @incoming_scopes, &block)
end