Class: Cascading::Aggregations

Inherits:
Object
  • Object
show all
Defined in:
lib/cascading/aggregations.rb

Overview

Aggregations is the context available to you within the block of a group_by, union, or join that allows you to apply Every pipes to the result of those operations. You may apply aggregators and buffers within this context subject to several rules laid out by Cascading.

Rules enforced by Aggregations:

  • Contains either 1 Buffer or >= 1 Aggregator (explicitly checked)

  • No GroupBys, CoGroups, Joins, or Merges (methods for these pipes do not exist on Aggregations)

  • No Eaches (Aggregations#each does not exist)

  • Aggregations may not branch (Aggregations#branch does not exist)

Externally enforced rules:

  • May be empty (in which case, Aggregations is not instantiated)

  • Must follow a GroupBy or CoGroup (not a HashJoin or Merge)

Optimizations:

  • If the leading Group is a GroupBy and all subsequent Everies are Aggregators that have a corresponding AggregateBy, Aggregations can replace the GroupBy/Aggregator pipe with a single composite AggregateBy

Aggregator and buffer DSL standard optional parameter names:

input

c.p.Every argument selector

into

c.o.Operation field declaration

output

c.p.Every output selector

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(assembly, group, incoming_scopes) ⇒ Aggregations

Do not use this constructor directly; instead, pass a block containing the desired aggregations to a group_by, union, or join and it will be instantiated for you.

Builds the context in which a sequence of Every aggregations may be evaluated in the given assembly appended to the given group pipe and with the given incoming_scopes.



37
38
39
40
41
42
43
44
# File 'lib/cascading/aggregations.rb', line 37

def initialize(assembly, group, incoming_scopes)
  @assembly = assembly
  @tail_pipe = group
  @scope = Scope.outgoing_scope(tail_pipe, incoming_scopes)

  # AggregateBy optimization only applies to GroupBy
  @aggregate_bys = tail_pipe.is_group_by ? [] : nil
end

Instance Attribute Details

#aggregate_bysObject (readonly)

Returns the value of attribute aggregate_bys.



28
29
30
# File 'lib/cascading/aggregations.rb', line 28

def aggregate_bys
  @aggregate_bys
end

#assemblyObject (readonly)

Returns the value of attribute assembly.



28
29
30
# File 'lib/cascading/aggregations.rb', line 28

def assembly
  @assembly
end

#scopeObject (readonly)

Returns the value of attribute scope.



28
29
30
# File 'lib/cascading/aggregations.rb', line 28

def scope
  @scope
end

#tail_pipeObject (readonly)

Returns the value of attribute tail_pipe.



28
29
30
# File 'lib/cascading/aggregations.rb', line 28

def tail_pipe
  @tail_pipe
end

Instance Method Details

#assert_group(assertion, options = {}) ⇒ Object

Builds an every assertion pipe given a c.o.a.Assertion and adds it to the current list of aggregations. Note this breaks a chain of AggregateBys.

The named options are:

level

The assertion level; defaults to strict.



115
116
117
118
119
120
# File 'lib/cascading/aggregations.rb', line 115

def assert_group(assertion, options = {})
  assertion_level = options[:level] || Java::CascadingOperation::AssertionLevel::STRICT

  parameters = [tail_pipe, assertion_level, assertion]
  make_pipe(Java::CascadingPipe::Every, parameters)
end

#assert_group_size_equals(size, options = {}) ⇒ Object

Builds a pipe that asserts the size of the current group is the specified size for all groups.



124
125
126
127
# File 'lib/cascading/aggregations.rb', line 124

def assert_group_size_equals(size, options = {})
  assertion = Java::CascadingOperationAssertion::AssertGroupSizeEquals.new(size)
  assert_group(assertion, options)
end

#average(*fields_or_field_map) ⇒ Object

Averages the specified fields within each group. Fields may be a list or a map for renaming. Note that fields are sorted by input name when a map is provided.

Examples:

assembly 'aggregate' do
  ...
  insert 'const' => 1
  group_by 'const' do
    max 'field1', 'field2'
    max 'field3' => 'fieldA', 'field4' => 'fieldB'
  end
  discard 'const'
end


272
273
274
275
276
277
278
279
280
281
# File 'lib/cascading/aggregations.rb', line 272

def average(*fields_or_field_map)
  field_map, _ = extract_field_map(fields_or_field_map)

  field_map.each do |in_field, out_field|
    average_aggregator = Java::CascadingOperationAggregator::Average.new(fields(out_field))
    average_by = Java::CascadingPipeAssembly::AverageBy.new(fields(in_field), fields(out_field))
    every(in_field, :aggregator => average_aggregator, :output => all_fields, :aggregate_by => average_by)
  end
  raise "average invoked on 0 fields" if field_map.empty?
end

#can_aggregate_by?Boolean

We can replace these aggregations with the corresponding composite AggregateBy if the leading Group was a GroupBy and all subsequent Aggregators had a corresponding AggregateBy (which we’ve encoded in the list of aggregate_bys being a non-empty array).

Returns:

  • (Boolean)


58
59
60
# File 'lib/cascading/aggregations.rb', line 58

def can_aggregate_by?
  !aggregate_bys.nil? && !aggregate_bys.empty?
end

#count(name = 'count') ⇒ Object

Counts elements of each group. May optionally specify the name of the output count field, which defaults to ‘count’.

Examples:

assembly 'aggregate' do
  ...
  group_by 'key1', 'key2' do
    count
    count 'key1_key2_count'
  end
end


220
221
222
223
224
# File 'lib/cascading/aggregations.rb', line 220

def count(name = 'count')
  count_aggregator = Java::CascadingOperationAggregator::Count.new(fields(name))
  count_by = Java::CascadingPipeAssembly::CountBy.new(fields(name))
  every(last_grouping_fields, :aggregator => count_aggregator, :output => all_fields, :aggregate_by => count_by)
end

#debug_scopeObject

Prints information about the scope of these Aggregations at the point at which it is called. This allows you to trace the propagation of field names through your job and is handy for debugging. See Scope for details.



50
51
52
# File 'lib/cascading/aggregations.rb', line 50

def debug_scope
  puts "Current scope of aggregations for '#{assembly.name}':\n  #{scope}\n----------\n"
end

#every(*args_with_options) ⇒ Object

Builds an every pipe and adds it to the current list of aggregations. Note that this list may be either exactly 1 Buffer or any number of Aggregators. Exactly one of :aggregator or :buffer must be specified and :aggregator may be accompanied by a corresponding :aggregate_by.

The named options are:

aggregator

A Cascading Aggregator, mutually exclusive with :buffer.

aggregate_by

A Cascading AggregateBy that corresponds to the given :aggregator. Only makes sense with the :aggregator option and does not exist for all Aggregators. Providing nothing or nil will cause all Aggregations to operate as normal, without being compiled into a composite AggregateBy.

buffer

A Cascading Buffer, mutually exclusive with :aggregator.

output

c.p.Every output selector.

Example:

every 'field1', 'field2', :aggregator => sum_aggregator, :aggregate_by => sum_by, :output => all_fields
every fields(input_fields), :buffer => Java::SomePackage::SomeBuffer.new, :output => all_fields


90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
# File 'lib/cascading/aggregations.rb', line 90

def every(*args_with_options)
  options, in_fields = args_with_options.extract_options!, fields(args_with_options)
  out_fields = fields(options[:output])
  operation = options[:aggregator] || options[:buffer]
  raise 'every requires either :aggregator or :buffer' unless operation

  if options[:aggregate_by] && aggregate_bys
    aggregate_bys << options[:aggregate_by]
  else
    @aggregate_bys = nil
  end

  parameters = [tail_pipe, in_fields, operation, out_fields].compact
  every = make_pipe(Java::CascadingPipe::Every, parameters)
  raise ':aggregator specified but c.o.Buffer provided' if options[:aggregator] && every.is_buffer
  raise ':buffer specified but c.o.Aggregator provided' if options[:buffer] && every.is_aggregator

  every
end

#finalizeObject

“Fix” out values fields after a sequence of Everies. This is a field name metadata fix which is why the Identity is not planned into the resulting Cascading pipe. Without it, all values fields would propagate through non-empty aggregations, which doesn’t match Cascading’s planner’s behavior.



67
68
69
70
# File 'lib/cascading/aggregations.rb', line 67

def finalize
  discard_each = Java::CascadingPipe::Each.new(tail_pipe, all_fields, Java::CascadingOperation::Identity.new)
  @scope = Scope.outgoing_scope(discard_each, [scope])
end

#first(*args_with_options) ⇒ Object

Returns the first value within each group for the specified fields. Fields may be a list or a map for renaming. Note that fields are sorted by input name when a map is provided.

The named options are:

ignore

Java Array of Tuples which should be ignored

Examples:

assembly 'aggregate' do
  ...
  group_by 'key1', 'key2' do
    first 'field1', 'field2'
    first 'field3' => 'fieldA', 'field4' => 'fieldB'
  end
end


186
187
188
# File 'lib/cascading/aggregations.rb', line 186

def first(*args_with_options)
  composite_aggregator(args_with_options, Java::CascadingOperationAggregator::First)
end

#last(*args_with_options) ⇒ Object

Returns the last value within each group for the specified fields. Fields may be a list or a map for renaming. Note that fields are sorted by input name when a map is provided.

The named options are:

ignore

Java Array of Tuples which should be ignored

Examples:

assembly 'aggregate' do
  ...
  group_by 'key1', 'key2' do
    last 'field1', 'field2'
    last 'field3' => 'fieldA', 'field4' => 'fieldB'
  end
end


205
206
207
# File 'lib/cascading/aggregations.rb', line 205

def last(*args_with_options)
  composite_aggregator(args_with_options, Java::CascadingOperationAggregator::Last)
end

#max(*args_with_options) ⇒ Object

Computes the maxima of the specified fields within each group. Fields may be a list or a map for renaming. Note that fields are sorted by input name when a map is provided.

The named options are:

ignore

Java Array of Objects of values to be ignored.

Examples:

assembly 'aggregate' do
  ...
  insert 'const' => 1
  group_by 'const' do
    max 'field1', 'field2'
    max 'field3' => 'fieldA', 'field4' => 'fieldB'
  end
  discard 'const'
end


167
168
169
# File 'lib/cascading/aggregations.rb', line 167

def max(*args_with_options)
  composite_aggregator(args_with_options, Java::CascadingOperationAggregator::Max)
end

#min(*args_with_options) ⇒ Object

Computes the minima of the specified fields within each group. Fields may be a list or a map for renaming. Note that fields are sorted by input name when a map is provided.

The named options are:

ignore

Java Array of Objects of values to be ignored.

Examples:

assembly 'aggregate' do
  ...
  insert 'const' => 1
  group_by 'const' do
    min 'field1', 'field2'
    min 'field3' => 'fieldA', 'field4' => 'fieldB'
  end
  discard 'const'
end


146
147
148
# File 'lib/cascading/aggregations.rb', line 146

def min(*args_with_options)
  composite_aggregator(args_with_options, Java::CascadingOperationAggregator::Min)
end

#sum(*args_with_options) ⇒ Object

Sums the specified fields within each group. Fields may be a list or provided through the :mapping option for renaming. Note that fields are sorted by name when a map is provided.

The named options are:

mapping

Map of input to output field names if renaming is desired. Results in output fields sorted by input field.

type

Controls the type of the output, specified using values from the Cascading::JAVA_TYPE_MAP as in Janino expressions (:double, :long, etc.)

Examples:

assembly 'aggregate' do
  ...
  group_by 'key1', 'key2' do
    sum 'field1', 'field2', :type => :long
    sum :mapping => { 'field3' => 'fieldA', 'field4' => 'fieldB' }, :type => :double
  end
end


244
245
246
247
248
249
250
251
252
253
254
255
256
# File 'lib/cascading/aggregations.rb', line 244

def sum(*args_with_options)
  options, in_fields = args_with_options.extract_options!, args_with_options
  type = JAVA_TYPE_MAP[options[:type]]

  mapping = options[:mapping] ? options[:mapping].sort : in_fields.zip(in_fields)
  mapping.each do |in_field, out_field|
    sum_aggregator = Java::CascadingOperationAggregator::Sum.new(*[fields(out_field), type].compact)
    # NOTE: SumBy requires a type in wip-286, unlike Sum (see Sum.java line 42 for default)
    sum_by = Java::CascadingPipeAssembly::SumBy.new(fields(in_field), fields(out_field), type || Java::double.java_class)
    every(in_field, :aggregator => sum_aggregator, :output => all_fields, :aggregate_by => sum_by)
  end
  raise "sum invoked on 0 fields (note :mapping must be provided to explicitly rename fields)" if mapping.empty?
end