Class: MaxUniqueSumReduce
- Inherits:
-
ReduceBase
- Object
- Stage
- ReduceBase
- MaxUniqueSumReduce
- Defined in:
- lib/mrtoolkit.rb
Overview
Reducer sums the values for each unique key Outputs only the M max values
Instance Attribute Summary
Attributes inherited from Stage
#errors, #in_fields, #in_sep, #out_fields, #out_sep
Instance Method Summary collapse
- #declare ⇒ Object
-
#initialize(*args) ⇒ MaxUniqueSumReduce
constructor
A new instance of MaxUniqueSumReduce.
- #process_begin(dummy, output) ⇒ Object
- #process_each(input, output) ⇒ Object
- #process_end(dummy, output) ⇒ Object
-
#process_init(input, output) ⇒ Object
These three do the sum.
- #process_term(dummy, output) ⇒ Object
- #sort_pool ⇒ Object
Methods inherited from ReduceBase
#process, #process_end_internal, #process_internal, #run
Methods inherited from Stage
#catch_errors, #copy_struct, #emit, #emit_separator, #field, #field_separator, #new_input, #new_output, #prepare, #process_step, #write_out
Constructor Details
#initialize(*args) ⇒ MaxUniqueSumReduce
Returns a new instance of MaxUniqueSumReduce.
624 625 626 627 |
# File 'lib/mrtoolkit.rb', line 624 def initialize(*args) raise ArgumentError if args.size < 1 @m = args[0].to_i end |
Instance Method Details
#declare ⇒ Object
629 630 631 632 633 634 635 |
# File 'lib/mrtoolkit.rb', line 629 def declare field :key field :value emit :key emit :value end |
#process_begin(dummy, output) ⇒ Object
641 642 643 644 |
# File 'lib/mrtoolkit.rb', line 641 def process_begin(dummy, output) @pool = [] nil end |
#process_each(input, output) ⇒ Object
650 651 652 653 |
# File 'lib/mrtoolkit.rb', line 650 def process_each(input, output) @sum += input.value.to_i nil end |
#process_end(dummy, output) ⇒ Object
664 665 666 667 668 669 670 671 672 |
# File 'lib/mrtoolkit.rb', line 664 def process_end(dummy, output) output = [] @pool.each do |elem| item = new_output item.key, item.value = elem output << item end output end |
#process_init(input, output) ⇒ Object
These three do the sum
646 647 648 649 |
# File 'lib/mrtoolkit.rb', line 646 def process_init(input, output) @sum = 0 nil end |
#process_term(dummy, output) ⇒ Object
654 655 656 657 658 659 660 661 662 663 |
# File 'lib/mrtoolkit.rb', line 654 def process_term(dummy, output) if @pool.size < @m @pool << [@last, @sum] sort_pool elsif @sum > @pool[-1][1] @pool[-1] = [@last, @sum] sort_pool end nil end |
#sort_pool ⇒ Object
637 638 639 |
# File 'lib/mrtoolkit.rb', line 637 def sort_pool @pool.sort! {|x, y| y[1] <=> x[1]} end |