Class: SampleReduce
- Inherits:
-
ReduceBase
- Object
- Stage
- ReduceBase
- SampleReduce
- Defined in:
- lib/mrtoolkit.rb
Overview
Reducer samples the input One argument must be given: the number of samples to retain Outputs that many lines TODO store the whole input object in pool? or else take another argument of columns to store
Instance Attribute Summary
Attributes inherited from Stage
#errors, #in_fields, #in_sep, #out_fields, #out_sep
Instance Method Summary collapse
- #declare ⇒ Object
-
#initialize(*args) ⇒ SampleReduce
constructor
A new instance of SampleReduce.
- #process(input, output) ⇒ Object
- #process_begin(dummy, output) ⇒ Object
- #process_end(dummy, output) ⇒ Object
Methods inherited from ReduceBase
#process_each, #process_end_internal, #process_init, #process_internal, #process_term, #run
Methods inherited from Stage
#catch_errors, #copy_struct, #emit, #emit_separator, #field, #field_separator, #new_input, #new_output, #prepare, #process_step, #write_out
Constructor Details
#initialize(*args) ⇒ SampleReduce
Returns a new instance of SampleReduce.
532 533 534 535 |
# File 'lib/mrtoolkit.rb', line 532 def initialize(*args) raise ArgumentError if args.size < 1 @m = args[0].to_i end |
Instance Method Details
#declare ⇒ Object
537 538 539 540 541 |
# File 'lib/mrtoolkit.rb', line 537 def declare field :value emit :value end |
#process(input, output) ⇒ Object
547 548 549 550 551 552 553 554 555 |
# File 'lib/mrtoolkit.rb', line 547 def process(input, output) if @pool.size < @m @pool << input.value elsif rand < (@m.to_f / @n.to_f) @pool[rand(@m)] = input.value end @n += 1 nil end |
#process_begin(dummy, output) ⇒ Object
542 543 544 545 546 |
# File 'lib/mrtoolkit.rb', line 542 def process_begin(dummy, output) @pool = [] @n = 0 nil end |
#process_end(dummy, output) ⇒ Object
556 557 558 559 560 561 562 563 564 |
# File 'lib/mrtoolkit.rb', line 556 def process_end(dummy, output) output = [] @pool.each do |elem| item = new_output item.value = elem output << item end output end |