Class: Cascading::Aggregations

Inherits:
Object
  • Object
show all
Includes:
Operations
Defined in:
lib/cascading/aggregations.rb

Overview

Rules enforced by Aggregations:

  • Contains either 1 Buffer or >= 1 Aggregator (explicitly checked)

  • No GroupBys, CoGroups, Joins, or Merges (methods for these pipes do not

exist on Aggregations)

  • No Eaches (Aggregations#each does not exist)

  • Aggregations may not branch (Aggregations#branch does not exist)

Externally enforced rules:

  • May be empty (in which case, Aggregations is not instantiated)

  • Must follow a GroupBy or CoGroup (not a Join or Merge)

Optimizations:

  • If the leading Group is a GroupBy and all subsequent Everies are

Aggregators that have a corresponding AggregateBy, Aggregations can replace the GroupBy/Aggregator pipe with a single composite AggregateBy

Instance Attribute Summary collapse

Instance Method Summary collapse

Methods included from Operations

#aggregator_function, #coerce_to_java, #date_formatter, #date_parser, #expression_filter, #expression_function, #field_joiner, #first_function, #identity, #insert_function, #last_function, #max_function, #min_function, #regex_filter, #regex_generator, #regex_parser, #regex_replace, #regex_split_generator, #regex_splitter, #to_java_comparable_array

Constructor Details

#initialize(assembly, group, incoming_scopes) ⇒ Aggregations

Returns a new instance of Aggregations.



26
27
28
29
30
31
32
33
# File 'lib/cascading/aggregations.rb', line 26

def initialize(assembly, group, incoming_scopes)
  @assembly = assembly
  @tail_pipe = group
  @scope = Scope.outgoing_scope(tail_pipe, incoming_scopes)

  # AggregateBy optimization only applies to GroupBy
  @aggregate_bys = tail_pipe.is_group_by ? [] : nil
end

Instance Attribute Details

#aggregate_bysObject (readonly)

Returns the value of attribute aggregate_bys.



24
25
26
# File 'lib/cascading/aggregations.rb', line 24

def aggregate_bys
  @aggregate_bys
end

#assemblyObject (readonly)

Returns the value of attribute assembly.



24
25
26
# File 'lib/cascading/aggregations.rb', line 24

def assembly
  @assembly
end

#scopeObject (readonly)

Returns the value of attribute scope.



24
25
26
# File 'lib/cascading/aggregations.rb', line 24

def scope
  @scope
end

#tail_pipeObject (readonly)

Returns the value of attribute tail_pipe.



24
25
26
# File 'lib/cascading/aggregations.rb', line 24

def tail_pipe
  @tail_pipe
end

Instance Method Details

#assert_group(*args) ⇒ Object



90
91
92
93
94
95
96
97
98
# File 'lib/cascading/aggregations.rb', line 90

def assert_group(*args)
  options = args.extract_options!

  assertion = args[0]
  assertion_level = options[:level] || Java::CascadingOperation::AssertionLevel::STRICT

  parameters = [tail_pipe, assertion_level, assertion]
  make_pipe(Java::CascadingPipe::Every, parameters)
end

#assert_group_size_equals(*args) ⇒ Object



100
101
102
103
104
105
# File 'lib/cascading/aggregations.rb', line 100

def assert_group_size_equals(*args)
  options = args.extract_options!

  assertion = Java::CascadingOperationAssertion::AssertGroupSizeEquals.new(args[0])
  assert_group(assertion, options)
end

#average(*args) ⇒ Object

Averages one or more fields. The contract of average is identical to that of other composite aggregators, but it accepts no options.



163
164
165
166
167
168
169
170
171
172
# File 'lib/cascading/aggregations.rb', line 163

def average(*args)
  field_map, _ = extract_field_map(args)

  field_map.each do |in_field, out_field|
    average_aggregator = Java::CascadingOperationAggregator::Average.new(fields(out_field))
    average_by = Java::CascadingPipeAssembly::AverageBy.new(fields(in_field), fields(out_field))
    every(in_field, :aggregator => average_aggregator, :output => all_fields, :aggregate_by => average_by)
  end
  raise "average invoked on 0 fields" if field_map.empty?
end

#can_aggregate_by?Boolean

We can replace these aggregations with the corresponding composite AggregateBy if the leading Group was a GroupBy and all subsequent Aggregators had a corresponding AggregateBy (which we’ve encoded in the list of aggregate_bys being a non-empty array).

Returns:

  • (Boolean)


56
57
58
# File 'lib/cascading/aggregations.rb', line 56

def can_aggregate_by?
  !aggregate_bys.nil? && !aggregate_bys.empty?
end

#composite_aggregator(args, function) ⇒ Object

Builds a series of every pipes for aggregation.

Args can either be a list of fields to aggregate and an options hash or a hash that maps input field name to output field name (similar to insert) and an options hash.

Options include:

  • :ignore a Java Array of Objects (for min and max) or Tuples

(for first and last) of values for the aggregator to ignore

  • function is a symbol that is the method to call to construct

the Cascading Aggregator.



118
119
120
121
122
123
124
125
126
# File 'lib/cascading/aggregations.rb', line 118

def composite_aggregator(args, function)
  field_map, options = extract_field_map(args)

  field_map.each do |in_field, out_field|
    agg = self.send(function, out_field, options)
    every(in_field, :aggregator => agg, :output => all_fields)
  end
  raise "Composite aggregator '#{function.to_s.gsub('_function', '')}' invoked on 0 fields" if field_map.empty?
end

#count(name = 'count') ⇒ Object

Counts elements of a group. May optionally specify the name of the output count field (defaults to ‘count’).



135
136
137
138
139
# File 'lib/cascading/aggregations.rb', line 135

def count(name = 'count')
  count_aggregator = Java::CascadingOperationAggregator::Count.new(fields(name))
  count_by = Java::CascadingPipeAssembly::CountBy.new(fields(name))
  every(last_grouping_fields, :aggregator => count_aggregator, :output => all_fields, :aggregate_by => count_by)
end

#debug_scopeObject



35
36
37
# File 'lib/cascading/aggregations.rb', line 35

def debug_scope
  puts "Current scope of aggregations for '#{assembly.name}':\n  #{scope}\n----------\n"
end

#every(*args) ⇒ Object

Builds an every pipe and adds it to the current list of aggregations. Note that this list may be either exactly 1 Buffer or any number of Aggregators.



73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
# File 'lib/cascading/aggregations.rb', line 73

def every(*args)
  options = args.extract_options!

  in_fields = fields(args)
  out_fields = fields(options[:output])
  operation = options[:aggregator] || options[:buffer]

  if options[:aggregate_by] && aggregate_bys
    aggregate_bys << options[:aggregate_by]
  else
    @aggregate_bys = nil
  end

  parameters = [tail_pipe, in_fields, operation, out_fields].compact
  make_pipe(Java::CascadingPipe::Every, parameters)
end

#finalizeObject

“Fix” out values fields after a sequence of Everies. This is a field name metadata fix which is why the Identity is not planned into the resulting Cascading pipe. Without it, all values fields would propagate through non-empty aggregations, which doesn’t match Cascading’s planner’s behavior.



65
66
67
68
# File 'lib/cascading/aggregations.rb', line 65

def finalize
  discard_each = Java::CascadingPipe::Each.new(tail_pipe, all_fields, Java::CascadingOperation::Identity.new)
  @scope = Scope.outgoing_scope(discard_each, [scope])
end

#first(*args) ⇒ Object



130
# File 'lib/cascading/aggregations.rb', line 130

def first(*args); composite_aggregator(args, :first_function); end

#last(*args) ⇒ Object



131
# File 'lib/cascading/aggregations.rb', line 131

def last(*args); composite_aggregator(args, :last_function); end

#max(*args) ⇒ Object



129
# File 'lib/cascading/aggregations.rb', line 129

def max(*args); composite_aggregator(args, :max_function); end

#min(*args) ⇒ Object



128
# File 'lib/cascading/aggregations.rb', line 128

def min(*args); composite_aggregator(args, :min_function); end

#sum(*args) ⇒ Object

Sums one or more fields. Fields to be summed may either be provided as the arguments to sum (in which case they will be aggregated into a field of the same name in the given order), or via a hash using the :mapping parameter (in which case they will be aggregated from the field named by the key into the field named by the value after being sorted). The type of the output sum may be controlled with the :type parameter.



147
148
149
150
151
152
153
154
155
156
157
158
159
# File 'lib/cascading/aggregations.rb', line 147

def sum(*args)
  options = args.extract_options!
  type = JAVA_TYPE_MAP[options[:type]]

  mapping = options[:mapping] ? options[:mapping].sort : args.zip(args)
  mapping.each do |in_field, out_field|
    sum_aggregator = Java::CascadingOperationAggregator::Sum.new(*[fields(out_field), type].compact)
    # NOTE: SumBy requires a type in wip-286, unlike Sum (see Sum.java line 42 for default)
    sum_by = Java::CascadingPipeAssembly::SumBy.new(fields(in_field), fields(out_field), type || Java::double.java_class)
    every(in_field, :aggregator => sum_aggregator, :output => all_fields, :aggregate_by => sum_by)
  end
  raise "sum invoked on 0 fields (note :mapping must be provided to explicitly rename fields)" if mapping.empty?
end