Class: JobBase

Inherits:
Object
  • Object
show all
Defined in:
lib/mrtoolkit.rb

Overview

Base class for jobs

Constant Summary collapse

@@testing =
false

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(*args) ⇒ JobBase

Returns a new instance of JobBase.



725
726
727
# File 'lib/mrtoolkit.rb', line 725

def initialize(*args)
  @stages = []
end

Class Method Details

.filename_map(filename) ⇒ Object

Change filename so a path maps into a simple name. / ==> -

  • > all

[]? ==> _



737
738
739
# File 'lib/mrtoolkit.rb', line 737

def JobBase.filename_map(filename)
  filename.gsub(/\*/, 'all').gsub(/\//, '-').gsub(/[\[\]?]/, '_')
end

.get_job_optsObject



847
848
849
850
851
852
853
854
# File 'lib/mrtoolkit.rb', line 847

def self.get_job_opts
  opts = {}
  if ARGV[0] == '-v'
    opts[:verbose] = true
    ARGV.shift
  end
  opts
end

.run_command(opt = nil) ⇒ Object



879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
# File 'lib/mrtoolkit.rb', line 879

def self.run_command(opt = nil)
  return if @@testing && opt == :at_exit
  return run_test if @@testing

  filename = $0 unless filename    
  if ARGV[0] == '-s'
    ARGV.shift
    class_name = ARGV.shift
    action = Object.const_get(class_name).new(*ARGV)
    action.declare
    action.prepare
    action.run(STDIN, STDOUT)
  else
    opts = get_job_opts
    # create an instance of the class that was called originally
    action = self.new
    action.prepare
    action.run(File.basename(filename), opts)
  end
end

.run_testObject



873
874
875
876
877
# File 'lib/mrtoolkit.rb', line 873

def self.run_test
  job = self.new
  job.prepare
  job.run_test
end

.testing(val) ⇒ Object



729
730
731
# File 'lib/mrtoolkit.rb', line 729

def JobBase.testing(val)
  @@testing = val
end

Instance Method Details

#add_stageObject

This gathers the declarations and stores in a stage record.



780
781
782
783
784
785
786
787
788
789
790
# File 'lib/mrtoolkit.rb', line 780

def add_stage
  case
  when @map_class.nil?    then raise "Map class not specified"
  when @reduce_class.nil? then raise "Reduce class not specified"
  when @in_dirs.empty?    then raise "Indir not speficied"
  when @out_dir.nil?      then raise "Outdir not specified"
  end       
  @stages << [@map_class, @map_args, @map_opts,
    @reduce_class, @reduce_args, @reduce_opts, 
    @in_dirs, @out_dir, @reducers, @extras]
end

#build_command(fname, klass, args) ⇒ Object



839
840
841
842
843
844
845
# File 'lib/mrtoolkit.rb', line 839

def build_command(fname, klass, args)
  res = "#{fname} -s #{klass.to_s}"
  if args
    res += " #{args.join(' ')}"
  end
  res
end

#extra(ex) ⇒ Object



766
767
768
# File 'lib/mrtoolkit.rb', line 766

def extra ex
  @extras << ex
end

#hadoop_opts(name) ⇒ Object



775
776
777
778
# File 'lib/mrtoolkit.rb', line 775

def hadoop_opts name
  @hadoop_opts = [] unless @hadoop_opts
  @hadoop_opts << name
end

#indir(in_dir) ⇒ Object Also known as: infiles



755
756
757
# File 'lib/mrtoolkit.rb', line 755

def indir in_dir
  @in_dirs << in_dir
end

#map_opt(n, v) ⇒ Object



769
770
771
# File 'lib/mrtoolkit.rb', line 769

def map_opt n, v
  @map_opts[n] = v
end

#mapper(map_class, *args) ⇒ Object

These store job declarations



742
743
744
745
746
747
748
# File 'lib/mrtoolkit.rb', line 742

def mapper map_class, *args
  @map_class = map_class
  @map_args = args
  @map_opts = {}
  @in_dirs = []
  @extras = []
end

#outdir(out_dir) ⇒ Object Also known as: outfiles



759
760
761
# File 'lib/mrtoolkit.rb', line 759

def outdir out_dir
  @out_dir = JobBase.filename_map(out_dir)
end

#prepareObject

For each method in the class starting with “stage”, call the method, then call add_stage. This can be used to create multi-stage map-reduce programs.



795
796
797
798
799
800
801
802
803
# File 'lib/mrtoolkit.rb', line 795

def prepare
  ms = self.class.instance_methods.find_all do |m|
    m =~ /(^stage)|(^job$)/
  end
  ms.sort.each do |m|
    self.method(m).call
    add_stage
  end
end

#reduce_opt(n, v) ⇒ Object



772
773
774
# File 'lib/mrtoolkit.rb', line 772

def reduce_opt n, v
  @reduce_opts[n] = v
end

#reducer(reduce_class, *args) ⇒ Object



749
750
751
752
753
754
# File 'lib/mrtoolkit.rb', line 749

def reducer reduce_class, *args
  @reduce_class = reduce_class
  @reduce_args = args
  @reducers = 1
  @reduce_opts = {}
end

#reducers(n) ⇒ Object



763
764
765
# File 'lib/mrtoolkit.rb', line 763

def reducers n
  @reducers = n
end

#run(fname, opts) ⇒ Object



856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
# File 'lib/mrtoolkit.rb', line 856

def run(fname, opts)
  sr = StreamRunner.new
  out_dir = "out"
  @stages.each do |s|
    map_class, map_args, map_opts, 
        reduce_class, reduce_args, reduce_opts,
        in_dirs, out_dir, reducers, extras = *s
    opts = opts.merge({:hadoop_opts => @hadoop_opts.join(" ")}) if @hadoop_opts && @hadoop_opts.size > 0
    sr.run_map_reduce(in_dirs, out_dir, 
      build_command(fname, map_class, map_args),
      build_command(fname, reduce_class, reduce_args),
      reducers,
      [__FILE__, 'stream_runner.rb'] + extras, 
      map_opts, reduce_opts, opts)
  end
end

#run_testObject

Run the job. For each stage, run the mapper, then sort the intermediate output, then run the reducer.



808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
# File 'lib/mrtoolkit.rb', line 808

def run_test
  map_out_file = "/tmp/map-out"
  red_in_file = "/tmp/reduce-in"
  @stages.each do |s|
    map_class, map_args, map_opts, 
        reduce_class, reduce_args, reduce_opts,
        in_dirs, out_dir, reducers, extras = *s
    mapper = map_class.new(*map_args)
    mapper.declare
    mapper.prepare
    in_dirs.each do |in_dir|
      StreamIO.open(in_dir, "r") do |in_fd|
        StreamIO.open(map_out_file, "w") do |out_fd|
          mapper.run in_fd, out_fd
        end
      end
    end

    system "sort <#{map_out_file} >#{red_in_file}"

    reducer = reduce_class.new(*reduce_args)
    reducer.declare
    reducer.prepare
    StreamIO.open(red_in_file, "r") do |in_fd|
      StreamIO.open(out_dir, "w") do |out_fd|
        reducer.run in_fd, out_fd
      end
    end
  end
end