Class: Piglet::Relation::Cogroup

Inherits:
Object
  • Object
show all
Includes:
Relation
Defined in:
lib/piglet/relation/cogroup.rb

Overview

:nodoc:

Instance Attribute Summary

Attributes included from Relation

#sources

Instance Method Summary collapse

Methods included from Relation

#[], #alias, #cogroup, #cross, #distinct, #eql?, #field, #filter, #foreach, #group, #hash, #join, #limit, #method_missing, #nested_foreach, #next_field_alias, #order, #sample, #split, #stream, #union

Constructor Details

#initialize(relation, interpreter, description) ⇒ Cogroup

Returns a new instance of Cogroup.



8
9
10
11
12
13
# File 'lib/piglet/relation/cogroup.rb', line 8

def initialize(relation, interpreter, description)
  @interpreter = interpreter
  @join_fields = description.reject { |k, v| ! (k.is_a?(Relation)) }
  @sources = @join_fields.keys
  @parallel = description[:parallel]
end

Dynamic Method Handling

This class handles dynamic methods through the method_missing method in the class Piglet::Relation::Relation

Instance Method Details

#schemaObject



15
16
17
18
19
20
21
22
23
24
25
26
27
28
# File 'lib/piglet/relation/cogroup.rb', line 15

def schema
  first_schema = @sources.first.schema
  join_fields = @join_fields[@sources.first]
  if join_fields.is_a?(Enumerable) && join_fields.size > 1
    group_type = join_fields.map { |f| [f, first_schema.field_type[f]] }
    description = [[:group, :tuple, group_type]]
  else
    description = [[:group, *join_fields]]
  end
  @sources.each do |source|
    description << [source.alias.to_sym, Piglet::Schema::Bag.new(source.schema)]
  end
  Piglet::Schema::Tuple.parse(description)
end

#to_sObject



30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
# File 'lib/piglet/relation/cogroup.rb', line 30

def to_s
  joins = @sources.map do |s|
    fields = @join_fields[s]
    if fields.is_a?(Enumerable) && fields.size > 1 && (fields.last == :inner || fields.last == :outer)
      inout = fields.last.to_s.upcase
      fields = fields[0..-2]
    end
    if fields.is_a?(Enumerable) && fields.size == 1
      fields = fields.first
    end
    if fields.is_a?(Enumerable)
      str = "#{s.alias} BY (#{fields.join(', ')})"
    else
      str = "#{s.alias} BY #{fields}"
    end
    str << " #{inout}" if inout
    str
  end
  str  = "COGROUP #{joins.join(', ')}"
  str << " PARALLEL #{@parallel}" if @parallel
  str
end