Class: Cascading::Aggregations
- Inherits:
-
Object
- Object
- Cascading::Aggregations
- Includes:
- Operations
- Defined in:
- lib/cascading/aggregations.rb
Overview
Rules enforced by Aggregations:
-
Contains either 1 Buffer or >= 1 Aggregator (explicitly checked)
-
No GroupBys, CoGroups, Joins, or Merges (methods for these pipes do not
exist on Aggregations)
-
No Eaches (Aggregations#each does not exist)
-
Aggregations may not branch (Aggregations#branch does not exist)
Externally enforced rules:
-
May be empty (in which case, Aggregations is not instantiated)
-
Must follow a GroupBy or CoGroup (not a Join or Merge)
Optimizations:
-
If the leading Group is a GroupBy and all subsequent Everies are
Aggregators that have a corresponding AggregateBy, Aggregations can replace the GroupBy/Aggregator pipe with a single composite AggregateBy
Instance Attribute Summary collapse
-
#aggregate_bys ⇒ Object
readonly
Returns the value of attribute aggregate_bys.
-
#assembly ⇒ Object
readonly
Returns the value of attribute assembly.
-
#scope ⇒ Object
readonly
Returns the value of attribute scope.
-
#tail_pipe ⇒ Object
readonly
Returns the value of attribute tail_pipe.
Instance Method Summary collapse
- #assert_group(*args) ⇒ Object
- #assert_group_size_equals(*args) ⇒ Object
-
#average(*args) ⇒ Object
Averages one or more fields.
-
#can_aggregate_by? ⇒ Boolean
We can replace these aggregations with the corresponding composite AggregateBy if the leading Group was a GroupBy and all subsequent Aggregators had a corresponding AggregateBy (which we’ve encoded in the list of aggregate_bys being a non-empty array).
-
#composite_aggregator(args, function) ⇒ Object
Builds a series of every pipes for aggregation.
-
#count(name = 'count') ⇒ Object
Counts elements of a group.
- #debug_scope ⇒ Object
-
#every(*args) ⇒ Object
Builds an every pipe and adds it to the current list of aggregations.
-
#finalize ⇒ Object
“Fix” out values fields after a sequence of Everies.
- #first(*args) ⇒ Object
-
#initialize(assembly, group, incoming_scopes) ⇒ Aggregations
constructor
A new instance of Aggregations.
- #last(*args) ⇒ Object
- #max(*args) ⇒ Object
- #min(*args) ⇒ Object
-
#sum(*args) ⇒ Object
Sums one or more fields.
Methods included from Operations
#aggregator_function, #coerce_to_java, #date_formatter, #date_parser, #expression_filter, #expression_function, #field_joiner, #first_function, #identity, #insert_function, #last_function, #max_function, #min_function, #regex_filter, #regex_generator, #regex_parser, #regex_replace, #regex_split_generator, #regex_splitter, #to_java_comparable_array
Constructor Details
#initialize(assembly, group, incoming_scopes) ⇒ Aggregations
Returns a new instance of Aggregations.
26 27 28 29 30 31 32 33 |
# File 'lib/cascading/aggregations.rb', line 26 def initialize(assembly, group, incoming_scopes) @assembly = assembly @tail_pipe = group @scope = Scope.outgoing_scope(tail_pipe, incoming_scopes) # AggregateBy optimization only applies to GroupBy @aggregate_bys = tail_pipe.is_group_by ? [] : nil end |
Instance Attribute Details
#aggregate_bys ⇒ Object (readonly)
Returns the value of attribute aggregate_bys.
24 25 26 |
# File 'lib/cascading/aggregations.rb', line 24 def aggregate_bys @aggregate_bys end |
#assembly ⇒ Object (readonly)
Returns the value of attribute assembly.
24 25 26 |
# File 'lib/cascading/aggregations.rb', line 24 def assembly @assembly end |
#scope ⇒ Object (readonly)
Returns the value of attribute scope.
24 25 26 |
# File 'lib/cascading/aggregations.rb', line 24 def scope @scope end |
#tail_pipe ⇒ Object (readonly)
Returns the value of attribute tail_pipe.
24 25 26 |
# File 'lib/cascading/aggregations.rb', line 24 def tail_pipe @tail_pipe end |
Instance Method Details
#assert_group(*args) ⇒ Object
90 91 92 93 94 95 96 97 98 |
# File 'lib/cascading/aggregations.rb', line 90 def assert_group(*args) = args. assertion = args[0] assertion_level = [:level] || Java::CascadingOperation::AssertionLevel::STRICT parameters = [tail_pipe, assertion_level, assertion] make_pipe(Java::CascadingPipe::Every, parameters) end |
#assert_group_size_equals(*args) ⇒ Object
100 101 102 103 104 105 |
# File 'lib/cascading/aggregations.rb', line 100 def assert_group_size_equals(*args) = args. assertion = Java::CascadingOperationAssertion::AssertGroupSizeEquals.new(args[0]) assert_group(assertion, ) end |
#average(*args) ⇒ Object
Averages one or more fields. The contract of average is identical to that of other composite aggregators, but it accepts no options.
163 164 165 166 167 168 169 170 171 172 |
# File 'lib/cascading/aggregations.rb', line 163 def average(*args) field_map, _ = extract_field_map(args) field_map.each do |in_field, out_field| average_aggregator = Java::CascadingOperationAggregator::Average.new(fields(out_field)) average_by = Java::CascadingPipeAssembly::AverageBy.new(fields(in_field), fields(out_field)) every(in_field, :aggregator => average_aggregator, :output => all_fields, :aggregate_by => average_by) end raise "average invoked on 0 fields" if field_map.empty? end |
#can_aggregate_by? ⇒ Boolean
We can replace these aggregations with the corresponding composite AggregateBy if the leading Group was a GroupBy and all subsequent Aggregators had a corresponding AggregateBy (which we’ve encoded in the list of aggregate_bys being a non-empty array).
56 57 58 |
# File 'lib/cascading/aggregations.rb', line 56 def can_aggregate_by? !aggregate_bys.nil? && !aggregate_bys.empty? end |
#composite_aggregator(args, function) ⇒ Object
Builds a series of every pipes for aggregation.
Args can either be a list of fields to aggregate and an options hash or a hash that maps input field name to output field name (similar to insert) and an options hash.
Options include:
-
:ignore
a Java Array of Objects (for min and max) or Tuples
(for first and last) of values for the aggregator to ignore
-
function
is a symbol that is the method to call to construct
the Cascading Aggregator.
118 119 120 121 122 123 124 125 126 |
# File 'lib/cascading/aggregations.rb', line 118 def composite_aggregator(args, function) field_map, = extract_field_map(args) field_map.each do |in_field, out_field| agg = self.send(function, out_field, ) every(in_field, :aggregator => agg, :output => all_fields) end raise "Composite aggregator '#{function.to_s.gsub('_function', '')}' invoked on 0 fields" if field_map.empty? end |
#count(name = 'count') ⇒ Object
Counts elements of a group. May optionally specify the name of the output count field (defaults to ‘count’).
135 136 137 138 139 |
# File 'lib/cascading/aggregations.rb', line 135 def count(name = 'count') count_aggregator = Java::CascadingOperationAggregator::Count.new(fields(name)) count_by = Java::CascadingPipeAssembly::CountBy.new(fields(name)) every(last_grouping_fields, :aggregator => count_aggregator, :output => all_fields, :aggregate_by => count_by) end |
#debug_scope ⇒ Object
35 36 37 |
# File 'lib/cascading/aggregations.rb', line 35 def debug_scope puts "Current scope of aggregations for '#{assembly.name}':\n #{scope}\n----------\n" end |
#every(*args) ⇒ Object
Builds an every pipe and adds it to the current list of aggregations. Note that this list may be either exactly 1 Buffer or any number of Aggregators.
73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 |
# File 'lib/cascading/aggregations.rb', line 73 def every(*args) = args. in_fields = fields(args) out_fields = fields([:output]) operation = [:aggregator] || [:buffer] if [:aggregate_by] && aggregate_bys aggregate_bys << [:aggregate_by] else @aggregate_bys = nil end parameters = [tail_pipe, in_fields, operation, out_fields].compact make_pipe(Java::CascadingPipe::Every, parameters) end |
#finalize ⇒ Object
“Fix” out values fields after a sequence of Everies. This is a field name metadata fix which is why the Identity is not planned into the resulting Cascading pipe. Without it, all values fields would propagate through non-empty aggregations, which doesn’t match Cascading’s planner’s behavior.
65 66 67 68 |
# File 'lib/cascading/aggregations.rb', line 65 def finalize discard_each = Java::CascadingPipe::Each.new(tail_pipe, all_fields, Java::CascadingOperation::Identity.new) @scope = Scope.outgoing_scope(discard_each, [scope]) end |
#first(*args) ⇒ Object
130 |
# File 'lib/cascading/aggregations.rb', line 130 def first(*args); composite_aggregator(args, :first_function); end |
#last(*args) ⇒ Object
131 |
# File 'lib/cascading/aggregations.rb', line 131 def last(*args); composite_aggregator(args, :last_function); end |
#max(*args) ⇒ Object
129 |
# File 'lib/cascading/aggregations.rb', line 129 def max(*args); composite_aggregator(args, :max_function); end |
#min(*args) ⇒ Object
128 |
# File 'lib/cascading/aggregations.rb', line 128 def min(*args); composite_aggregator(args, :min_function); end |
#sum(*args) ⇒ Object
Sums one or more fields. Fields to be summed may either be provided as the arguments to sum (in which case they will be aggregated into a field of the same name in the given order), or via a hash using the :mapping parameter (in which case they will be aggregated from the field named by the key into the field named by the value after being sorted). The type of the output sum may be controlled with the :type parameter.
147 148 149 150 151 152 153 154 155 156 157 158 159 |
# File 'lib/cascading/aggregations.rb', line 147 def sum(*args) = args. type = JAVA_TYPE_MAP[[:type]] mapping = [:mapping] ? [:mapping].sort : args.zip(args) mapping.each do |in_field, out_field| sum_aggregator = Java::CascadingOperationAggregator::Sum.new(*[fields(out_field), type].compact) # NOTE: SumBy requires a type in wip-286, unlike Sum (see Sum.java line 42 for default) sum_by = Java::CascadingPipeAssembly::SumBy.new(fields(in_field), fields(out_field), type || Java::double.java_class) every(in_field, :aggregator => sum_aggregator, :output => all_fields, :aggregate_by => sum_by) end raise "sum invoked on 0 fields (note :mapping must be provided to explicitly rename fields)" if mapping.empty? end |