Class: Stage

Inherits:
Object
  • Object
show all
Defined in:
lib/mrtoolkit.rb

Overview

Store information about a processing stage. Includes input and output field names, field separators, and the filenames processed by the stage.

Direct Known Subclasses

MapBase, ReduceBase

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(*args) ⇒ Stage

Returns a new instance of Stage.



12
13
# File 'lib/mrtoolkit.rb', line 12

def initialize(*args)
end

Instance Attribute Details

#errorsObject (readonly)

Returns the value of attribute errors.



10
11
12
# File 'lib/mrtoolkit.rb', line 10

def errors
  @errors
end

#in_fieldsObject (readonly)

Returns the value of attribute in_fields.



8
9
10
# File 'lib/mrtoolkit.rb', line 8

def in_fields
  @in_fields
end

#in_sepObject (readonly)

Returns the value of attribute in_sep.



9
10
11
# File 'lib/mrtoolkit.rb', line 9

def in_sep
  @in_sep
end

#out_fieldsObject (readonly)

Returns the value of attribute out_fields.



8
9
10
# File 'lib/mrtoolkit.rb', line 8

def out_fields
  @out_fields
end

#out_sepObject (readonly)

Returns the value of attribute out_sep.



9
10
11
# File 'lib/mrtoolkit.rb', line 9

def out_sep
  @out_sep
end

Instance Method Details

#catch_errorsObject



29
30
31
# File 'lib/mrtoolkit.rb', line 29

def catch_errors
  @catch_errors = true
end

#copy_struct(src, dest, skip = 0) ⇒ Object

Copies all fields of a struct to another Some fields can be skipped.



47
48
49
50
# File 'lib/mrtoolkit.rb', line 47

def copy_struct(src, dest, skip = 0)
  (0..src.length-1-skip).each {|i| dest[i] = src[i+skip]}
  dest
end

#declareObject



33
34
# File 'lib/mrtoolkit.rb', line 33

def declare
end

#emit(name) ⇒ Object



19
20
21
22
# File 'lib/mrtoolkit.rb', line 19

def emit name
  @out_fields = [] unless @out_fields
  @out_fields << name.to_sym
end

#emit_separator(sep) ⇒ Object



26
27
28
# File 'lib/mrtoolkit.rb', line 26

def emit_separator sep
  @out_sep = sep
end

#field(name) ⇒ Object



15
16
17
18
# File 'lib/mrtoolkit.rb', line 15

def field name
  @in_fields = [] unless @in_fields
  @in_fields << name.to_sym
end

#field_separator(sep) ⇒ Object



23
24
25
# File 'lib/mrtoolkit.rb', line 23

def field_separator sep
  @in_sep = sep
end

#new_input(line = nil) ⇒ Object



60
61
62
63
64
65
66
# File 'lib/mrtoolkit.rb', line 60

def new_input(line = nil)
  input = @input_type.new
  return input unless line
  fields = line.chomp.split(@in_sep)
  @in_fields.each_index { |i| input[i] = fields[i] }
  input
end

#new_outputObject



67
68
69
# File 'lib/mrtoolkit.rb', line 67

def new_output
  @output_type.new
end

#prepareObject

Create the input and output structures.



37
38
39
40
41
42
43
# File 'lib/mrtoolkit.rb', line 37

def prepare
  @in_sep = "\t" unless @in_sep
  @out_sep = "\t" unless @out_sep
  @input_type = Struct.new(*@in_fields)
  @output_type = Struct.new(*@out_fields)
  @errors = 0
end

#process_step(fun, input = nil) ⇒ Object

Process one line of map or reduce file. Create output record. Call the given function. collect the output and write it out.



75
76
77
78
79
80
81
82
83
84
85
86
87
# File 'lib/mrtoolkit.rb', line 75

def process_step(fun, input = nil)
  begin
    out = send(fun, input, new_output)
    if out
      out = [out] unless out.class == Array
      out.each {|o| write_out(o)}
    end
  rescue StandardError
    STDERR.puts "Error: #{$!}"
    @errors += 1
    raise unless @catch_errors
  end
end

#write_out(output) ⇒ Object

Write any output



53
54
55
56
57
58
# File 'lib/mrtoolkit.rb', line 53

def write_out(output)
  if output
    outs = @out_fields.collect { |f| output[f].to_s.chomp }
    @out_fd.puts outs.join(@out_sep)
  end
end