Class: Stage
- Inherits:
-
Object
- Object
- Stage
- Defined in:
- lib/mrtoolkit.rb
Overview
Store information about a processing stage. Includes input and output field names, field separators, and the filenames processed by the stage.
Direct Known Subclasses
Instance Attribute Summary collapse
-
#errors ⇒ Object
readonly
Returns the value of attribute errors.
-
#in_fields ⇒ Object
readonly
Returns the value of attribute in_fields.
-
#in_sep ⇒ Object
readonly
Returns the value of attribute in_sep.
-
#out_fields ⇒ Object
readonly
Returns the value of attribute out_fields.
-
#out_sep ⇒ Object
readonly
Returns the value of attribute out_sep.
Instance Method Summary collapse
- #catch_errors ⇒ Object
-
#copy_struct(src, dest, skip = 0) ⇒ Object
Copies all fields of a struct to another Some fields can be skipped.
- #declare ⇒ Object
- #emit(name) ⇒ Object
- #emit_separator(sep) ⇒ Object
- #field(name) ⇒ Object
- #field_separator(sep) ⇒ Object
-
#initialize(*args) ⇒ Stage
constructor
A new instance of Stage.
- #new_input(line = nil) ⇒ Object
- #new_output ⇒ Object
-
#prepare ⇒ Object
Create the input and output structures.
-
#process_step(fun, input = nil) ⇒ Object
Process one line of map or reduce file.
-
#write_out(output) ⇒ Object
Write any output.
Constructor Details
#initialize(*args) ⇒ Stage
Returns a new instance of Stage.
12 13 |
# File 'lib/mrtoolkit.rb', line 12 def initialize(*args) end |
Instance Attribute Details
#errors ⇒ Object (readonly)
Returns the value of attribute errors.
10 11 12 |
# File 'lib/mrtoolkit.rb', line 10 def errors @errors end |
#in_fields ⇒ Object (readonly)
Returns the value of attribute in_fields.
8 9 10 |
# File 'lib/mrtoolkit.rb', line 8 def in_fields @in_fields end |
#in_sep ⇒ Object (readonly)
Returns the value of attribute in_sep.
9 10 11 |
# File 'lib/mrtoolkit.rb', line 9 def in_sep @in_sep end |
#out_fields ⇒ Object (readonly)
Returns the value of attribute out_fields.
8 9 10 |
# File 'lib/mrtoolkit.rb', line 8 def out_fields @out_fields end |
#out_sep ⇒ Object (readonly)
Returns the value of attribute out_sep.
9 10 11 |
# File 'lib/mrtoolkit.rb', line 9 def out_sep @out_sep end |
Instance Method Details
#catch_errors ⇒ Object
29 30 31 |
# File 'lib/mrtoolkit.rb', line 29 def catch_errors @catch_errors = true end |
#copy_struct(src, dest, skip = 0) ⇒ Object
Copies all fields of a struct to another Some fields can be skipped.
47 48 49 50 |
# File 'lib/mrtoolkit.rb', line 47 def copy_struct(src, dest, skip = 0) (0..src.length-1-skip).each {|i| dest[i] = src[i+skip]} dest end |
#declare ⇒ Object
33 34 |
# File 'lib/mrtoolkit.rb', line 33 def declare end |
#emit(name) ⇒ Object
19 20 21 22 |
# File 'lib/mrtoolkit.rb', line 19 def emit name @out_fields = [] unless @out_fields @out_fields << name.to_sym end |
#emit_separator(sep) ⇒ Object
26 27 28 |
# File 'lib/mrtoolkit.rb', line 26 def emit_separator sep @out_sep = sep end |
#field(name) ⇒ Object
15 16 17 18 |
# File 'lib/mrtoolkit.rb', line 15 def field name @in_fields = [] unless @in_fields @in_fields << name.to_sym end |
#field_separator(sep) ⇒ Object
23 24 25 |
# File 'lib/mrtoolkit.rb', line 23 def field_separator sep @in_sep = sep end |
#new_input(line = nil) ⇒ Object
60 61 62 63 64 65 66 |
# File 'lib/mrtoolkit.rb', line 60 def new_input(line = nil) input = @input_type.new return input unless line fields = line.chomp.split(@in_sep) @in_fields.each_index { |i| input[i] = fields[i] } input end |
#new_output ⇒ Object
67 68 69 |
# File 'lib/mrtoolkit.rb', line 67 def new_output @output_type.new end |
#prepare ⇒ Object
Create the input and output structures.
37 38 39 40 41 42 43 |
# File 'lib/mrtoolkit.rb', line 37 def prepare @in_sep = "\t" unless @in_sep @out_sep = "\t" unless @out_sep @input_type = Struct.new(*@in_fields) @output_type = Struct.new(*@out_fields) @errors = 0 end |
#process_step(fun, input = nil) ⇒ Object
Process one line of map or reduce file. Create output record. Call the given function. collect the output and write it out.
75 76 77 78 79 80 81 82 83 84 85 86 87 |
# File 'lib/mrtoolkit.rb', line 75 def process_step(fun, input = nil) begin out = send(fun, input, new_output) if out out = [out] unless out.class == Array out.each {|o| write_out(o)} end rescue StandardError STDERR.puts "Error: #{$!}" @errors += 1 raise unless @catch_errors end end |
#write_out(output) ⇒ Object
Write any output
53 54 55 56 57 58 |
# File 'lib/mrtoolkit.rb', line 53 def write_out(output) if output outs = @out_fields.collect { |f| output[f].to_s.chomp } @out_fd.puts outs.join(@out_sep) end end |