Class: MaxUniqueSumReduce

Inherits:
ReduceBase show all
Defined in:
lib/mrtoolkit.rb

Overview

Reducer sums the values for each unique key Outputs only the M max values

Instance Attribute Summary

Attributes inherited from Stage

#errors, #in_fields, #in_sep, #out_fields, #out_sep

Instance Method Summary collapse

Methods inherited from ReduceBase

#process, #process_end_internal, #process_internal, #run

Methods inherited from Stage

#catch_errors, #copy_struct, #emit, #emit_separator, #field, #field_separator, #new_input, #new_output, #prepare, #process_step, #write_out

Constructor Details

#initialize(*args) ⇒ MaxUniqueSumReduce

Returns a new instance of MaxUniqueSumReduce.

Raises:

  • (ArgumentError)


624
625
626
627
# File 'lib/mrtoolkit.rb', line 624

def initialize(*args)
  raise ArgumentError if args.size < 1
  @m = args[0].to_i
end

Instance Method Details

#declareObject



629
630
631
632
633
634
635
# File 'lib/mrtoolkit.rb', line 629

def declare
  field :key
  field :value

  emit :key
  emit :value
end

#process_begin(dummy, output) ⇒ Object



641
642
643
644
# File 'lib/mrtoolkit.rb', line 641

def process_begin(dummy, output)
  @pool = []
  nil
end

#process_each(input, output) ⇒ Object



650
651
652
653
# File 'lib/mrtoolkit.rb', line 650

def process_each(input, output)
  @sum += input.value.to_i
  nil
end

#process_end(dummy, output) ⇒ Object



664
665
666
667
668
669
670
671
672
# File 'lib/mrtoolkit.rb', line 664

def process_end(dummy, output)
  output = []
  @pool.each do |elem|
    item = new_output
    item.key, item.value = elem
    output << item
  end
  output
end

#process_init(input, output) ⇒ Object

These three do the sum



646
647
648
649
# File 'lib/mrtoolkit.rb', line 646

def process_init(input, output)
  @sum = 0
  nil
end

#process_term(dummy, output) ⇒ Object



654
655
656
657
658
659
660
661
662
663
# File 'lib/mrtoolkit.rb', line 654

def process_term(dummy, output)
  if @pool.size < @m
    @pool << [@last, @sum]
    sort_pool
  elsif @sum > @pool[-1][1]
    @pool[-1] = [@last, @sum]
    sort_pool
  end  
  nil
end

#sort_poolObject



637
638
639
# File 'lib/mrtoolkit.rb', line 637

def sort_pool
  @pool.sort! {|x, y| y[1] <=> x[1]}
end