Class: Cascading::Aggregations
- Inherits:
-
Object
- Object
- Cascading::Aggregations
- Defined in:
- lib/cascading/aggregations.rb
Overview
Aggregations is the context available to you within the block of a group_by, union, or join that allows you to apply Every pipes to the result of those operations. You may apply aggregators and buffers within this context subject to several rules laid out by Cascading.
Rules enforced by Aggregations:
-
Contains either 1 Buffer or >= 1 Aggregator (explicitly checked)
-
No GroupBys, CoGroups, Joins, or Merges (methods for these pipes do not exist on Aggregations)
-
No Eaches (Aggregations#each does not exist)
-
Aggregations may not branch (Aggregations#branch does not exist)
Externally enforced rules:
-
May be empty (in which case, Aggregations is not instantiated)
-
Must follow a GroupBy or CoGroup (not a HashJoin or Merge)
Optimizations:
-
If the leading Group is a GroupBy and all subsequent Everies are Aggregators that have a corresponding AggregateBy, Aggregations can replace the GroupBy/Aggregator pipe with a single composite AggregateBy
Aggregator and buffer DSL standard optional parameter names:
- input
-
c.p.Every argument selector
- into
-
c.o.Operation field declaration
- output
-
c.p.Every output selector
Instance Attribute Summary collapse
-
#aggregate_bys ⇒ Object
readonly
Returns the value of attribute aggregate_bys.
-
#assembly ⇒ Object
readonly
Returns the value of attribute assembly.
-
#scope ⇒ Object
readonly
Returns the value of attribute scope.
-
#tail_pipe ⇒ Object
readonly
Returns the value of attribute tail_pipe.
Instance Method Summary collapse
-
#assert_group(assertion, options = {}) ⇒ Object
Builds an every assertion pipe given a c.o.a.Assertion and adds it to the current list of aggregations.
-
#assert_group_size_equals(size, options = {}) ⇒ Object
Builds a pipe that asserts the size of the current group is the specified size for all groups.
-
#average(*fields_or_field_map) ⇒ Object
Averages the specified fields within each group.
-
#can_aggregate_by? ⇒ Boolean
We can replace these aggregations with the corresponding composite AggregateBy if the leading Group was a GroupBy and all subsequent Aggregators had a corresponding AggregateBy (which we’ve encoded in the list of aggregate_bys being a non-empty array).
-
#count(name = 'count') ⇒ Object
Counts elements of each group.
-
#debug_scope ⇒ Object
Prints information about the scope of these Aggregations at the point at which it is called.
-
#every(*args_with_options) ⇒ Object
Builds an every pipe and adds it to the current list of aggregations.
-
#finalize ⇒ Object
“Fix” out values fields after a sequence of Everies.
-
#first(*args_with_options) ⇒ Object
Returns the first value within each group for the specified fields.
-
#initialize(assembly, group, incoming_scopes) ⇒ Aggregations
constructor
Do not use this constructor directly; instead, pass a block containing the desired aggregations to a group_by, union, or join and it will be instantiated for you.
-
#last(*args_with_options) ⇒ Object
Returns the last value within each group for the specified fields.
-
#max(*args_with_options) ⇒ Object
Computes the maxima of the specified fields within each group.
-
#min(*args_with_options) ⇒ Object
Computes the minima of the specified fields within each group.
-
#sum(*args_with_options) ⇒ Object
Sums the specified fields within each group.
Constructor Details
#initialize(assembly, group, incoming_scopes) ⇒ Aggregations
Do not use this constructor directly; instead, pass a block containing the desired aggregations to a group_by, union, or join and it will be instantiated for you.
Builds the context in which a sequence of Every aggregations may be evaluated in the given assembly appended to the given group pipe and with the given incoming_scopes.
37 38 39 40 41 42 43 44 |
# File 'lib/cascading/aggregations.rb', line 37 def initialize(assembly, group, incoming_scopes) @assembly = assembly @tail_pipe = group @scope = Scope.outgoing_scope(tail_pipe, incoming_scopes) # AggregateBy optimization only applies to GroupBy @aggregate_bys = tail_pipe.is_group_by ? [] : nil end |
Instance Attribute Details
#aggregate_bys ⇒ Object (readonly)
Returns the value of attribute aggregate_bys.
28 29 30 |
# File 'lib/cascading/aggregations.rb', line 28 def aggregate_bys @aggregate_bys end |
#assembly ⇒ Object (readonly)
Returns the value of attribute assembly.
28 29 30 |
# File 'lib/cascading/aggregations.rb', line 28 def assembly @assembly end |
#scope ⇒ Object (readonly)
Returns the value of attribute scope.
28 29 30 |
# File 'lib/cascading/aggregations.rb', line 28 def scope @scope end |
#tail_pipe ⇒ Object (readonly)
Returns the value of attribute tail_pipe.
28 29 30 |
# File 'lib/cascading/aggregations.rb', line 28 def tail_pipe @tail_pipe end |
Instance Method Details
#assert_group(assertion, options = {}) ⇒ Object
Builds an every assertion pipe given a c.o.a.Assertion and adds it to the current list of aggregations. Note this breaks a chain of AggregateBys.
The named options are:
- level
-
The assertion level; defaults to strict.
115 116 117 118 119 120 |
# File 'lib/cascading/aggregations.rb', line 115 def assert_group(assertion, = {}) assertion_level = [:level] || Java::CascadingOperation::AssertionLevel::STRICT parameters = [tail_pipe, assertion_level, assertion] make_pipe(Java::CascadingPipe::Every, parameters) end |
#assert_group_size_equals(size, options = {}) ⇒ Object
Builds a pipe that asserts the size of the current group is the specified size for all groups.
124 125 126 127 |
# File 'lib/cascading/aggregations.rb', line 124 def assert_group_size_equals(size, = {}) assertion = Java::CascadingOperationAssertion::AssertGroupSizeEquals.new(size) assert_group(assertion, ) end |
#average(*fields_or_field_map) ⇒ Object
Averages the specified fields within each group. Fields may be a list or a map for renaming. Note that fields are sorted by input name when a map is provided.
Examples:
assembly 'aggregate' do
...
insert 'const' => 1
group_by 'const' do
max 'field1', 'field2'
max 'field3' => 'fieldA', 'field4' => 'fieldB'
end
discard 'const'
end
272 273 274 275 276 277 278 279 280 281 |
# File 'lib/cascading/aggregations.rb', line 272 def average(*fields_or_field_map) field_map, _ = extract_field_map(fields_or_field_map) field_map.each do |in_field, out_field| average_aggregator = Java::CascadingOperationAggregator::Average.new(fields(out_field)) average_by = Java::CascadingPipeAssembly::AverageBy.new(fields(in_field), fields(out_field)) every(in_field, :aggregator => average_aggregator, :output => all_fields, :aggregate_by => average_by) end raise "average invoked on 0 fields" if field_map.empty? end |
#can_aggregate_by? ⇒ Boolean
We can replace these aggregations with the corresponding composite AggregateBy if the leading Group was a GroupBy and all subsequent Aggregators had a corresponding AggregateBy (which we’ve encoded in the list of aggregate_bys being a non-empty array).
58 59 60 |
# File 'lib/cascading/aggregations.rb', line 58 def can_aggregate_by? !aggregate_bys.nil? && !aggregate_bys.empty? end |
#count(name = 'count') ⇒ Object
Counts elements of each group. May optionally specify the name of the output count field, which defaults to ‘count’.
Examples:
assembly 'aggregate' do
...
group_by 'key1', 'key2' do
count
count 'key1_key2_count'
end
end
220 221 222 223 224 |
# File 'lib/cascading/aggregations.rb', line 220 def count(name = 'count') count_aggregator = Java::CascadingOperationAggregator::Count.new(fields(name)) count_by = Java::CascadingPipeAssembly::CountBy.new(fields(name)) every(last_grouping_fields, :aggregator => count_aggregator, :output => all_fields, :aggregate_by => count_by) end |
#debug_scope ⇒ Object
Prints information about the scope of these Aggregations at the point at which it is called. This allows you to trace the propagation of field names through your job and is handy for debugging. See Scope for details.
50 51 52 |
# File 'lib/cascading/aggregations.rb', line 50 def debug_scope puts "Current scope of aggregations for '#{assembly.name}':\n #{scope}\n----------\n" end |
#every(*args_with_options) ⇒ Object
Builds an every pipe and adds it to the current list of aggregations. Note that this list may be either exactly 1 Buffer or any number of Aggregators. Exactly one of :aggregator or :buffer must be specified and :aggregator may be accompanied by a corresponding :aggregate_by.
The named options are:
- aggregator
-
A Cascading Aggregator, mutually exclusive with :buffer.
- aggregate_by
-
A Cascading AggregateBy that corresponds to the given :aggregator. Only makes sense with the :aggregator option and does not exist for all Aggregators. Providing nothing or nil will cause all Aggregations to operate as normal, without being compiled into a composite AggregateBy.
- buffer
-
A Cascading Buffer, mutually exclusive with :aggregator.
- output
-
c.p.Every output selector.
Example:
every 'field1', 'field2', :aggregator => sum_aggregator, :aggregate_by => sum_by, :output => all_fields
every fields(input_fields), :buffer => Java::SomePackage::SomeBuffer.new, :output => all_fields
90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 |
# File 'lib/cascading/aggregations.rb', line 90 def every(*) , in_fields = ., fields() out_fields = fields([:output]) operation = [:aggregator] || [:buffer] raise 'every requires either :aggregator or :buffer' unless operation if [:aggregate_by] && aggregate_bys aggregate_bys << [:aggregate_by] else @aggregate_bys = nil end parameters = [tail_pipe, in_fields, operation, out_fields].compact every = make_pipe(Java::CascadingPipe::Every, parameters) raise ':aggregator specified but c.o.Buffer provided' if [:aggregator] && every.is_buffer raise ':buffer specified but c.o.Aggregator provided' if [:buffer] && every.is_aggregator every end |
#finalize ⇒ Object
“Fix” out values fields after a sequence of Everies. This is a field name metadata fix which is why the Identity is not planned into the resulting Cascading pipe. Without it, all values fields would propagate through non-empty aggregations, which doesn’t match Cascading’s planner’s behavior.
67 68 69 70 |
# File 'lib/cascading/aggregations.rb', line 67 def finalize discard_each = Java::CascadingPipe::Each.new(tail_pipe, all_fields, Java::CascadingOperation::Identity.new) @scope = Scope.outgoing_scope(discard_each, [scope]) end |
#first(*args_with_options) ⇒ Object
Returns the first value within each group for the specified fields. Fields may be a list or a map for renaming. Note that fields are sorted by input name when a map is provided.
The named options are:
- ignore
-
Java Array of Tuples which should be ignored
Examples:
assembly 'aggregate' do
...
group_by 'key1', 'key2' do
first 'field1', 'field2'
first 'field3' => 'fieldA', 'field4' => 'fieldB'
end
end
186 187 188 |
# File 'lib/cascading/aggregations.rb', line 186 def first(*) composite_aggregator(, Java::CascadingOperationAggregator::First) end |
#last(*args_with_options) ⇒ Object
Returns the last value within each group for the specified fields. Fields may be a list or a map for renaming. Note that fields are sorted by input name when a map is provided.
The named options are:
- ignore
-
Java Array of Tuples which should be ignored
Examples:
assembly 'aggregate' do
...
group_by 'key1', 'key2' do
last 'field1', 'field2'
last 'field3' => 'fieldA', 'field4' => 'fieldB'
end
end
205 206 207 |
# File 'lib/cascading/aggregations.rb', line 205 def last(*) composite_aggregator(, Java::CascadingOperationAggregator::Last) end |
#max(*args_with_options) ⇒ Object
Computes the maxima of the specified fields within each group. Fields may be a list or a map for renaming. Note that fields are sorted by input name when a map is provided.
The named options are:
- ignore
-
Java Array of Objects of values to be ignored.
Examples:
assembly 'aggregate' do
...
insert 'const' => 1
group_by 'const' do
max 'field1', 'field2'
max 'field3' => 'fieldA', 'field4' => 'fieldB'
end
discard 'const'
end
167 168 169 |
# File 'lib/cascading/aggregations.rb', line 167 def max(*) composite_aggregator(, Java::CascadingOperationAggregator::Max) end |
#min(*args_with_options) ⇒ Object
Computes the minima of the specified fields within each group. Fields may be a list or a map for renaming. Note that fields are sorted by input name when a map is provided.
The named options are:
- ignore
-
Java Array of Objects of values to be ignored.
Examples:
assembly 'aggregate' do
...
insert 'const' => 1
group_by 'const' do
min 'field1', 'field2'
min 'field3' => 'fieldA', 'field4' => 'fieldB'
end
discard 'const'
end
146 147 148 |
# File 'lib/cascading/aggregations.rb', line 146 def min(*) composite_aggregator(, Java::CascadingOperationAggregator::Min) end |
#sum(*args_with_options) ⇒ Object
Sums the specified fields within each group. Fields may be a list or provided through the :mapping option for renaming. Note that fields are sorted by name when a map is provided.
The named options are:
- mapping
-
Map of input to output field names if renaming is desired. Results in output fields sorted by input field.
- type
-
Controls the type of the output, specified using values from the Cascading::JAVA_TYPE_MAP as in Janino expressions (:double, :long, etc.)
Examples:
assembly 'aggregate' do
...
group_by 'key1', 'key2' do
sum 'field1', 'field2', :type => :long
sum :mapping => { 'field3' => 'fieldA', 'field4' => 'fieldB' }, :type => :double
end
end
244 245 246 247 248 249 250 251 252 253 254 255 256 |
# File 'lib/cascading/aggregations.rb', line 244 def sum(*) , in_fields = ., type = JAVA_TYPE_MAP[[:type]] mapping = [:mapping] ? [:mapping].sort : in_fields.zip(in_fields) mapping.each do |in_field, out_field| sum_aggregator = Java::CascadingOperationAggregator::Sum.new(*[fields(out_field), type].compact) # NOTE: SumBy requires a type in wip-286, unlike Sum (see Sum.java line 42 for default) sum_by = Java::CascadingPipeAssembly::SumBy.new(fields(in_field), fields(out_field), type || Java::double.java_class) every(in_field, :aggregator => sum_aggregator, :output => all_fields, :aggregate_by => sum_by) end raise "sum invoked on 0 fields (note :mapping must be provided to explicitly rename fields)" if mapping.empty? end |