Class: ReduceBase
Overview
Base class for reduce
Reduce Stage Creates an object to hold input lines after they have been parsed and separated into fields. Reads input and feeds to process method, then collects output. Reduce input is map output
Direct Known Subclasses
CopyReduce, MaxReduce, MaxUniqueSumReduce, SampleReduce, SumReduce, UniqueCountReduce, UniqueFirstReduce, UniqueIndexedCountReduce, UniqueIndexedSumReduce, UniqueReduce, UniqueSumReduce
Instance Attribute Summary
Attributes inherited from Stage
#errors, #in_fields, #in_sep, #out_fields, #out_sep
Instance Method Summary collapse
-
#process(input, output) ⇒ Object
Called for each record.
-
#process_begin(dummy, output) ⇒ Object
Called at the beginning of reduction.
-
#process_each(input, output) ⇒ Object
Called for each one of the equal values.
-
#process_end(dummy, output) ⇒ Object
Called at the end of reduction.
- #process_end_internal(dummy) ⇒ Object
-
#process_init(input, output) ⇒ Object
Called at the beginning of a run of equal values of the first field.
-
#process_internal(input) ⇒ Object
This suite of functions is called on all records.
-
#process_term(dummy, output) ⇒ Object
Called after the run of equal values.
-
#run(in_fd, out_fd) ⇒ Object
Run the reducer.
Methods inherited from Stage
#catch_errors, #copy_struct, #declare, #emit, #emit_separator, #field, #field_separator, #initialize, #new_input, #new_output, #prepare, #process_step, #write_out
Constructor Details
This class inherits a constructor from Stage
Instance Method Details
#process(input, output) ⇒ Object
Called for each record.
179 180 181 |
# File 'lib/mrtoolkit.rb', line 179 def process(input, output) nil end |
#process_begin(dummy, output) ⇒ Object
Called at the beginning of reduction. No input.
175 176 177 |
# File 'lib/mrtoolkit.rb', line 175 def process_begin(dummy, output) nil end |
#process_each(input, output) ⇒ Object
Called for each one of the equal values.
164 165 166 |
# File 'lib/mrtoolkit.rb', line 164 def process_each(input, output) nil end |
#process_end(dummy, output) ⇒ Object
Called at the end of reduction.
183 184 185 |
# File 'lib/mrtoolkit.rb', line 183 def process_end(dummy, output) nil end |
#process_end_internal(dummy) ⇒ Object
215 216 217 |
# File 'lib/mrtoolkit.rb', line 215 def process_end_internal(dummy) process_step(:process_term, nil) if @last end |
#process_init(input, output) ⇒ Object
Called at the beginning of a run of equal values of the first field.
160 161 162 |
# File 'lib/mrtoolkit.rb', line 160 def process_init(input, output) nil end |
#process_internal(input) ⇒ Object
This suite of functions is called on all records. The function process_begin is called first, then process is called on each record, then process_end is called last. This default implementation implements the calls to process_init, proces_each, and process_term. The client can omit process_begin and process_end and just implement process to see each record.
195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 |
# File 'lib/mrtoolkit.rb', line 195 def process_internal(input) v = input[0] if @last.nil? process_step(:process_init, input) process_step(:process_each, input) @last = v return end if v == @last # As long as key is the same, just process it process_step(:process_each, input) return end # The run has ended process_step(:process_term, input) if @last @last = v process_step(:process_init, input) process_step(:process_each, input) end |
#process_term(dummy, output) ⇒ Object
Called after the run of equal values. No input record. Previous value of first field in @last.
169 170 171 |
# File 'lib/mrtoolkit.rb', line 169 def process_term(dummy, output) nil end |
#run(in_fd, out_fd) ⇒ Object
Run the reducer. Call process_begin, then for each line, call process, then call process_end. At each step, collect any output and write it out.
223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 |
# File 'lib/mrtoolkit.rb', line 223 def run(in_fd, out_fd) @out_fd = out_fd @last = nil process_step(:process_begin, nil) input = nil # so it will survive the loop in_fd.each_line do |line| @raw_input = line input = new_input(line) process_internal(input) process_step(:process, input) end process_end_internal(nil) process_step(:process_end, nil) end |