Class: MaxReduce

Inherits:
ReduceBase show all
Defined in:
lib/mrtoolkit.rb

Overview

Reducer retains the the M maximum values in column 2 Column 2 must be numeric TODO store rest of fields too

Direct Known Subclasses

MinReduce

Instance Attribute Summary

Attributes inherited from Stage

#errors, #in_fields, #in_sep, #out_fields, #out_sep

Instance Method Summary collapse

Methods inherited from ReduceBase

#process_each, #process_end_internal, #process_init, #process_internal, #process_term, #run

Methods inherited from Stage

#catch_errors, #copy_struct, #emit, #emit_separator, #field, #field_separator, #new_input, #new_output, #prepare, #process_step, #write_out

Constructor Details

#initialize(*args) ⇒ MaxReduce

Returns a new instance of MaxReduce.



571
572
573
574
575
576
577
# File 'lib/mrtoolkit.rb', line 571

def initialize(*args)
  if args[0]
    @m = args[0].to_i
  else
    @m = 1
  end
end

Instance Method Details

#compare(x, y) ⇒ Object



587
588
589
# File 'lib/mrtoolkit.rb', line 587

def compare(x, y)
  y <=> x
end

#declareObject



579
580
581
582
583
584
585
# File 'lib/mrtoolkit.rb', line 579

def declare
  field :key
  field :value

  emit :key
  emit :value
end

#process(input, output) ⇒ Object



599
600
601
602
603
604
605
606
607
608
609
# File 'lib/mrtoolkit.rb', line 599

def process(input, output)
  val = input.value.to_i
  if @pool.size < @m
    @pool << [input.key, val]
    sort_pool
  elsif val > @pool[-1][1]
    @pool[-1] = [input.key, val]
    sort_pool
  end  
  nil
end

#process_begin(dummy, output) ⇒ Object



595
596
597
598
# File 'lib/mrtoolkit.rb', line 595

def process_begin(dummy, output)
  @pool = []
  nil
end

#process_end(dummy, output) ⇒ Object



610
611
612
613
614
615
616
617
618
# File 'lib/mrtoolkit.rb', line 610

def process_end(dummy, output)
  output = []
  @pool.each do |elem|
    item = new_output
    item.key, item.value = elem
    output << item
  end
  output
end

#sort_poolObject



591
592
593
# File 'lib/mrtoolkit.rb', line 591

def sort_pool
  @pool.sort! {|x, y| compare(x[1], y[1])}
end