Class: ReduceBase

Inherits:
Stage
  • Object
show all
Defined in:
lib/mrtoolkit.rb

Overview

Base class for reduce

Reduce Stage Creates an object to hold input lines after they have been parsed and separated into fields. Reads input and feeds to process method, then collects output. Reduce input is map output

Instance Attribute Summary

Attributes inherited from Stage

#errors, #in_fields, #in_sep, #out_fields, #out_sep

Instance Method Summary collapse

Methods inherited from Stage

#catch_errors, #copy_struct, #declare, #emit, #emit_separator, #field, #field_separator, #initialize, #new_input, #new_output, #prepare, #process_step, #write_out

Constructor Details

This class inherits a constructor from Stage

Instance Method Details

#process(input, output) ⇒ Object

Called for each record.



179
180
181
# File 'lib/mrtoolkit.rb', line 179

def process(input, output)
  nil
end

#process_begin(dummy, output) ⇒ Object

Called at the beginning of reduction. No input.



175
176
177
# File 'lib/mrtoolkit.rb', line 175

def process_begin(dummy, output)
  nil
end

#process_each(input, output) ⇒ Object

Called for each one of the equal values.



164
165
166
# File 'lib/mrtoolkit.rb', line 164

def process_each(input, output)
  nil
end

#process_end(dummy, output) ⇒ Object

Called at the end of reduction.



183
184
185
# File 'lib/mrtoolkit.rb', line 183

def process_end(dummy, output)
  nil
end

#process_end_internal(dummy) ⇒ Object



215
216
217
# File 'lib/mrtoolkit.rb', line 215

def process_end_internal(dummy)
  process_step(:process_term, nil) if @last
end

#process_init(input, output) ⇒ Object

Called at the beginning of a run of equal values of the first field.



160
161
162
# File 'lib/mrtoolkit.rb', line 160

def process_init(input, output)
  nil
end

#process_internal(input) ⇒ Object

This suite of functions is called on all records. The function process_begin is called first, then process is called on each record, then process_end is called last. This default implementation implements the calls to process_init, proces_each, and process_term. The client can omit process_begin and process_end and just implement process to see each record.



195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
# File 'lib/mrtoolkit.rb', line 195

def process_internal(input)
  v = input[0]
  if @last.nil?
    process_step(:process_init, input)
    process_step(:process_each, input)
    @last = v
    return
  end
  if v == @last
    # As long as key is the same, just process it
    process_step(:process_each, input)
    return
  end
  # The run has ended
  process_step(:process_term, input) if @last
  @last = v

  process_step(:process_init, input)
  process_step(:process_each, input)
end

#process_term(dummy, output) ⇒ Object

Called after the run of equal values. No input record. Previous value of first field in @last.



169
170
171
# File 'lib/mrtoolkit.rb', line 169

def process_term(dummy, output)
  nil
end

#run(in_fd, out_fd) ⇒ Object

Run the reducer. Call process_begin, then for each line, call process, then call process_end. At each step, collect any output and write it out.



223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
# File 'lib/mrtoolkit.rb', line 223

def run(in_fd, out_fd)
  @out_fd = out_fd
  @last = nil
  process_step(:process_begin, nil)

  input = nil			# so it will survive the loop
  in_fd.each_line do |line|
    @raw_input = line
    input = new_input(line)
    process_internal(input)
    process_step(:process, input)
  end
  process_end_internal(nil)
  process_step(:process_end, nil)
end