Class: UniqueCountReduce

Inherits:
ReduceBase show all
Defined in:
lib/mrtoolkit.rb

Overview

Reducer counts within each unique value of the first field. Outputs one line of counts for each unique value of the first field.

Instance Attribute Summary

Attributes inherited from Stage

#errors, #in_fields, #in_sep, #out_fields, #out_sep

Instance Method Summary collapse

Methods inherited from ReduceBase

#process, #process_begin, #process_end, #process_end_internal, #process_internal, #run

Methods inherited from Stage

#catch_errors, #copy_struct, #emit, #emit_separator, #field, #field_separator, #new_input, #new_output, #prepare, #process_step, #write_out

Constructor Details

#initialize(*args) ⇒ UniqueCountReduce

Returns a new instance of UniqueCountReduce.



422
423
424
425
426
427
428
# File 'lib/mrtoolkit.rb', line 422

def initialize(*args)
  if args[0]
    @m = args[0].to_i - 1
  else
    @m = -1
  end
end

Instance Method Details

#declareObject



430
431
432
433
434
435
436
437
# File 'lib/mrtoolkit.rb', line 430

def declare
  field :unique
  (0..@m).each {|i| field "extra#{i}"}

  emit :value
  emit :count
  (0..@m).each {|i| emit "extra#{i}"}
end

#process_each(input, output) ⇒ Object



443
444
445
446
447
# File 'lib/mrtoolkit.rb', line 443

def process_each(input, output)
  @count += 1
  (0..@m).each {|i| @extra[i] = input[i+1]}
  nil
end

#process_init(input, output) ⇒ Object



438
439
440
441
442
# File 'lib/mrtoolkit.rb', line 438

def process_init(input, output)
  @count = 0
  @extra = Array.new(@m+1)
  nil
end

#process_term(dummy, output) ⇒ Object



448
449
450
451
452
453
# File 'lib/mrtoolkit.rb', line 448

def process_term(dummy, output)
  output.value = @last
  output.count = @count
  (0..@m).each {|i| output[i+2] = @extra[i]}
  output
end