Class: SampleReduce

Inherits:
ReduceBase show all
Defined in:
lib/mrtoolkit.rb

Overview

Reducer samples the input One argument must be given: the number of samples to retain Outputs that many lines TODO store the whole input object in pool? or else take another argument of columns to store

Instance Attribute Summary

Attributes inherited from Stage

#errors, #in_fields, #in_sep, #out_fields, #out_sep

Instance Method Summary collapse

Methods inherited from ReduceBase

#process_each, #process_end_internal, #process_init, #process_internal, #process_term, #run

Methods inherited from Stage

#catch_errors, #copy_struct, #emit, #emit_separator, #field, #field_separator, #new_input, #new_output, #prepare, #process_step, #write_out

Constructor Details

#initialize(*args) ⇒ SampleReduce

Returns a new instance of SampleReduce.

Raises:

  • (ArgumentError)


532
533
534
535
# File 'lib/mrtoolkit.rb', line 532

def initialize(*args)
  raise ArgumentError if args.size < 1
  @m = args[0].to_i
end

Instance Method Details

#declareObject



537
538
539
540
541
# File 'lib/mrtoolkit.rb', line 537

def declare
  field :value

  emit :value
end

#process(input, output) ⇒ Object



547
548
549
550
551
552
553
554
555
# File 'lib/mrtoolkit.rb', line 547

def process(input, output)
  if @pool.size < @m
    @pool << input.value
  elsif rand < (@m.to_f / @n.to_f)
    @pool[rand(@m)] = input.value
  end  
  @n += 1
  nil
end

#process_begin(dummy, output) ⇒ Object



542
543
544
545
546
# File 'lib/mrtoolkit.rb', line 542

def process_begin(dummy, output)
  @pool = []
  @n = 0
  nil
end

#process_end(dummy, output) ⇒ Object



556
557
558
559
560
561
562
563
564
# File 'lib/mrtoolkit.rb', line 556

def process_end(dummy, output)
  output = []
  @pool.each do |elem|
    item = new_output
    item.value = elem
    output << item
  end
  output
end