Class: UniqueSumReduce

Inherits:
ReduceBase show all
Defined in:
lib/mrtoolkit.rb

Overview

This reducer sums within each unique value of the first field. Outputs one line of sums for each unique value of the first field.

Instance Attribute Summary

Attributes inherited from Stage

#errors, #in_fields, #in_sep, #out_fields, #out_sep

Instance Method Summary collapse

Methods inherited from ReduceBase

#process, #process_begin, #process_end, #process_end_internal, #process_internal, #run

Methods inherited from Stage

#catch_errors, #copy_struct, #emit, #emit_separator, #field, #field_separator, #new_input, #new_output, #prepare, #process_step, #write_out

Constructor Details

#initialize(*args) ⇒ UniqueSumReduce

Returns a new instance of UniqueSumReduce.



379
380
381
382
383
384
385
386
387
388
389
390
# File 'lib/mrtoolkit.rb', line 379

def initialize(*args)
  if args[0]
    @n = args[0].to_i - 1
  else
    @n = 0
  end
  if args[1]
    @m = args[1].to_i - 1
  else
    @m = -1
  end
end

Instance Method Details

#declareObject



392
393
394
395
396
397
398
399
400
# File 'lib/mrtoolkit.rb', line 392

def declare
  field :unique
  (0..@n).each {|i| field "count#{i}"}
  (0..@m).each {|i| field "extra#{i}"}

  emit :value
  (0..@n).each {|i| emit "sum#{i}"}
  (0..@m).each {|i| emit "extra#{i}"}
end

#process_each(input, output) ⇒ Object



406
407
408
409
410
# File 'lib/mrtoolkit.rb', line 406

def process_each(input, output)
  (0..@n).each {|i| @sum[i] += input[i+1].to_i}
  (0..@m).each {|i| @extra[i] = input[i+@n+2]}
  nil
end

#process_init(input, output) ⇒ Object



401
402
403
404
405
# File 'lib/mrtoolkit.rb', line 401

def process_init(input, output)
  @sum = Array.new(@n+1, 0)
  @extra = Array.new(@m+1)
  nil
end

#process_term(dummy, output) ⇒ Object



411
412
413
414
415
416
# File 'lib/mrtoolkit.rb', line 411

def process_term(dummy, output)
  output.value = @last
  (0..@n).each {|i| output[i+1] = @sum[i]}
  (0..@m).each {|i| output[i+@n+2] = @extra[i]}
  output
end