Class: MaxReduce
Overview
Reducer retains the the M maximum values in column 2 Column 2 must be numeric TODO store rest of fields too
Instance Attribute Summary
Attributes inherited from Stage
#errors, #in_fields, #in_sep, #out_fields, #out_sep
Instance Method Summary
collapse
Methods inherited from ReduceBase
#process_each, #process_end_internal, #process_init, #process_internal, #process_term, #run
Methods inherited from Stage
#catch_errors, #copy_struct, #emit, #emit_separator, #field, #field_separator, #new_input, #new_output, #prepare, #process_step, #write_out
Constructor Details
#initialize(*args) ⇒ MaxReduce
Returns a new instance of MaxReduce.
571
572
573
574
575
576
577
|
# File 'lib/mrtoolkit.rb', line 571
def initialize(*args)
if args[0]
@m = args[0].to_i
else
@m = 1
end
end
|
Instance Method Details
#compare(x, y) ⇒ Object
587
588
589
|
# File 'lib/mrtoolkit.rb', line 587
def compare(x, y)
y <=> x
end
|
#declare ⇒ Object
579
580
581
582
583
584
585
|
# File 'lib/mrtoolkit.rb', line 579
def declare
field :key
field :value
emit :key
emit :value
end
|
#process(input, output) ⇒ Object
599
600
601
602
603
604
605
606
607
608
609
|
# File 'lib/mrtoolkit.rb', line 599
def process(input, output)
val = input.value.to_i
if @pool.size < @m
@pool << [input.key, val]
sort_pool
elsif val > @pool[-1][1]
@pool[-1] = [input.key, val]
sort_pool
end
nil
end
|
#process_begin(dummy, output) ⇒ Object
595
596
597
598
|
# File 'lib/mrtoolkit.rb', line 595
def process_begin(dummy, output)
@pool = []
nil
end
|
#process_end(dummy, output) ⇒ Object
610
611
612
613
614
615
616
617
618
|
# File 'lib/mrtoolkit.rb', line 610
def process_end(dummy, output)
output = []
@pool.each do |elem|
item = new_output
item.key, item.value = elem
output << item
end
output
end
|
#sort_pool ⇒ Object
591
592
593
|
# File 'lib/mrtoolkit.rb', line 591
def sort_pool
@pool.sort! {|x, y| compare(x[1], y[1])}
end
|