Class: JobBase
- Inherits:
-
Object
- Object
- JobBase
- Defined in:
- lib/mrtoolkit.rb
Overview
Base class for jobs
Constant Summary collapse
- @@testing =
false
Class Method Summary collapse
-
.filename_map(filename) ⇒ Object
Change filename so a path maps into a simple name.
- .get_job_opts ⇒ Object
- .run_command(opt = nil) ⇒ Object
- .run_test ⇒ Object
- .testing(val) ⇒ Object
Instance Method Summary collapse
-
#add_stage ⇒ Object
This gathers the declarations and stores in a stage record.
- #build_command(fname, klass, args) ⇒ Object
- #extra(ex) ⇒ Object
- #hadoop_opts(name) ⇒ Object
- #indir(in_dir) ⇒ Object (also: #infiles)
-
#initialize(*args) ⇒ JobBase
constructor
A new instance of JobBase.
- #map_opt(n, v) ⇒ Object
-
#mapper(map_class, *args) ⇒ Object
These store job declarations.
- #outdir(out_dir) ⇒ Object (also: #outfiles)
-
#prepare ⇒ Object
For each method in the class starting with “stage”, call the method, then call add_stage.
- #reduce_opt(n, v) ⇒ Object
- #reducer(reduce_class, *args) ⇒ Object
- #reducers(n) ⇒ Object
- #run(fname, opts) ⇒ Object
-
#run_test ⇒ Object
Run the job.
Constructor Details
#initialize(*args) ⇒ JobBase
Returns a new instance of JobBase.
725 726 727 |
# File 'lib/mrtoolkit.rb', line 725 def initialize(*args) @stages = [] end |
Class Method Details
.filename_map(filename) ⇒ Object
Change filename so a path maps into a simple name. / ==> -
-
> all
[]? ==> _
737 738 739 |
# File 'lib/mrtoolkit.rb', line 737 def JobBase.filename_map(filename) filename.gsub(/\*/, 'all').gsub(/\//, '-').gsub(/[\[\]?]/, '_') end |
.get_job_opts ⇒ Object
847 848 849 850 851 852 853 854 |
# File 'lib/mrtoolkit.rb', line 847 def self.get_job_opts opts = {} if ARGV[0] == '-v' opts[:verbose] = true ARGV.shift end opts end |
.run_command(opt = nil) ⇒ Object
879 880 881 882 883 884 885 886 887 888 889 890 891 892 893 894 895 896 897 898 |
# File 'lib/mrtoolkit.rb', line 879 def self.run_command(opt = nil) return if @@testing && opt == :at_exit return run_test if @@testing filename = $0 unless filename if ARGV[0] == '-s' ARGV.shift class_name = ARGV.shift action = Object.const_get(class_name).new(*ARGV) action.declare action.prepare action.run(STDIN, STDOUT) else opts = get_job_opts # create an instance of the class that was called originally action = self.new action.prepare action.run(File.basename(filename), opts) end end |
.run_test ⇒ Object
873 874 875 876 877 |
# File 'lib/mrtoolkit.rb', line 873 def self.run_test job = self.new job.prepare job.run_test end |
.testing(val) ⇒ Object
729 730 731 |
# File 'lib/mrtoolkit.rb', line 729 def JobBase.testing(val) @@testing = val end |
Instance Method Details
#add_stage ⇒ Object
This gathers the declarations and stores in a stage record.
780 781 782 783 784 785 786 787 788 789 790 |
# File 'lib/mrtoolkit.rb', line 780 def add_stage case when @map_class.nil? then raise "Map class not specified" when @reduce_class.nil? then raise "Reduce class not specified" when @in_dirs.empty? then raise "Indir not speficied" when @out_dir.nil? then raise "Outdir not specified" end @stages << [@map_class, @map_args, @map_opts, @reduce_class, @reduce_args, @reduce_opts, @in_dirs, @out_dir, @reducers, @extras] end |
#build_command(fname, klass, args) ⇒ Object
839 840 841 842 843 844 845 |
# File 'lib/mrtoolkit.rb', line 839 def build_command(fname, klass, args) res = "#{fname} -s #{klass.to_s}" if args res += " #{args.join(' ')}" end res end |
#extra(ex) ⇒ Object
766 767 768 |
# File 'lib/mrtoolkit.rb', line 766 def extra ex @extras << ex end |
#hadoop_opts(name) ⇒ Object
775 776 777 778 |
# File 'lib/mrtoolkit.rb', line 775 def hadoop_opts name @hadoop_opts = [] unless @hadoop_opts @hadoop_opts << name end |
#indir(in_dir) ⇒ Object Also known as: infiles
755 756 757 |
# File 'lib/mrtoolkit.rb', line 755 def indir in_dir @in_dirs << in_dir end |
#map_opt(n, v) ⇒ Object
769 770 771 |
# File 'lib/mrtoolkit.rb', line 769 def map_opt n, v @map_opts[n] = v end |
#mapper(map_class, *args) ⇒ Object
These store job declarations
742 743 744 745 746 747 748 |
# File 'lib/mrtoolkit.rb', line 742 def mapper map_class, *args @map_class = map_class @map_args = args @map_opts = {} @in_dirs = [] @extras = [] end |
#outdir(out_dir) ⇒ Object Also known as: outfiles
759 760 761 |
# File 'lib/mrtoolkit.rb', line 759 def outdir out_dir @out_dir = JobBase.filename_map(out_dir) end |
#prepare ⇒ Object
For each method in the class starting with “stage”, call the method, then call add_stage. This can be used to create multi-stage map-reduce programs.
795 796 797 798 799 800 801 802 803 |
# File 'lib/mrtoolkit.rb', line 795 def prepare ms = self.class.instance_methods.find_all do |m| m =~ /(^stage)|(^job$)/ end ms.sort.each do |m| self.method(m).call add_stage end end |
#reduce_opt(n, v) ⇒ Object
772 773 774 |
# File 'lib/mrtoolkit.rb', line 772 def reduce_opt n, v @reduce_opts[n] = v end |
#reducer(reduce_class, *args) ⇒ Object
749 750 751 752 753 754 |
# File 'lib/mrtoolkit.rb', line 749 def reducer reduce_class, *args @reduce_class = reduce_class @reduce_args = args @reducers = 1 @reduce_opts = {} end |
#reducers(n) ⇒ Object
763 764 765 |
# File 'lib/mrtoolkit.rb', line 763 def reducers n @reducers = n end |
#run(fname, opts) ⇒ Object
856 857 858 859 860 861 862 863 864 865 866 867 868 869 870 871 |
# File 'lib/mrtoolkit.rb', line 856 def run(fname, opts) sr = StreamRunner.new out_dir = "out" @stages.each do |s| map_class, map_args, map_opts, reduce_class, reduce_args, reduce_opts, in_dirs, out_dir, reducers, extras = *s opts = opts.merge({:hadoop_opts => @hadoop_opts.join(" ")}) if @hadoop_opts && @hadoop_opts.size > 0 sr.run_map_reduce(in_dirs, out_dir, build_command(fname, map_class, map_args), build_command(fname, reduce_class, reduce_args), reducers, [__FILE__, 'stream_runner.rb'] + extras, map_opts, reduce_opts, opts) end end |
#run_test ⇒ Object
Run the job. For each stage, run the mapper, then sort the intermediate output, then run the reducer.
808 809 810 811 812 813 814 815 816 817 818 819 820 821 822 823 824 825 826 827 828 829 830 831 832 833 834 835 836 837 |
# File 'lib/mrtoolkit.rb', line 808 def run_test map_out_file = "/tmp/map-out" red_in_file = "/tmp/reduce-in" @stages.each do |s| map_class, map_args, map_opts, reduce_class, reduce_args, reduce_opts, in_dirs, out_dir, reducers, extras = *s mapper = map_class.new(*map_args) mapper.declare mapper.prepare in_dirs.each do |in_dir| StreamIO.open(in_dir, "r") do |in_fd| StreamIO.open(map_out_file, "w") do |out_fd| mapper.run in_fd, out_fd end end end system "sort <#{map_out_file} >#{red_in_file}" reducer = reduce_class.new(*reduce_args) reducer.declare reducer.prepare StreamIO.open(red_in_file, "r") do |in_fd| StreamIO.open(out_dir, "w") do |out_fd| reducer.run in_fd, out_fd end end end end |