Module: Wukong::Hadoop::HadoopInvocation
- Included in:
- Driver
- Defined in:
- lib/wukong-hadoop/driver/hadoop_invocation.rb
Overview
Provides methods for executing a map/reduce job on a Hadoop cluster via Hadoop streaming.
Instance Method Summary collapse
-
#ensure_input_and_output! ⇒ Object
Raise an error unless we have input and output.
-
#hadoop_commandline ⇒ String
Return the Hadoop command used to launch this job in a Hadoop cluster.
-
#hadoop_files ⇒ Object
:nodoc:.
-
#hadoop_jobconf_options ⇒ Array<String>
Return an array of jobconf (-D) options that will be passed to Hadoop.
-
#hadoop_other_args ⇒ String
Returns other arguments used by Hadoop streaming.
-
#hadoop_recycle_env ⇒ Object
:nodoc:.
-
#hadoop_runner ⇒ String
The name of the Hadoop binary to use.
-
#input_format ⇒ String
The input format to use.
-
#io_formats ⇒ Object
:nodoc:.
-
#java_opt(option, value) ⇒ Object
:nodoc:.
-
#job_name ⇒ String
The job name that will be passed to Hadoop.
-
#output_format ⇒ String
The output format to use.
-
#parsed_java_opts ⇒ Object
:nodoc:.
-
#remove_output_path! ⇒ Object
Remove the output path.
-
#ruby_interpreter_path ⇒ Object
:nodoc:.
-
#use_alternative_gemfile ⇒ Object
:nodoc:.
Instance Method Details
#ensure_input_and_output! ⇒ Object
Raise an error unless we have input and output.
10 11 12 |
# File 'lib/wukong-hadoop/driver/hadoop_invocation.rb', line 10 def ensure_input_and_output! raise Error.new("Explicit --input and --output paths are required to run a job in Hadoop mode.") if input_paths.nil? || input_paths.empty? || output_path.nil? || output_path.empty? end |
#hadoop_commandline ⇒ String
Return the Hadoop command used to launch this job in a Hadoop cluster.
You should be able to copy, paste, and run this command unmodified when debugging.
31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 |
# File 'lib/wukong-hadoop/driver/hadoop_invocation.rb', line 31 def hadoop_commandline [ hadoop_runner, "jar #{settings[:hadoop_home]}/contrib/streaming/hadoop-*streaming*.jar", , "-D mapred.job.name='#{job_name}'", hadoop_other_args, hadoop_files, "-mapper '#{mapper_commandline}'", "-reducer '#{reducer_commandline}'", "-input '#{input_paths}'", "-output '#{output_path}'", io_formats, hadoop_recycle_env, ].flatten.compact.join(" \t\\\n ") end |
#hadoop_files ⇒ Object
148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 |
# File 'lib/wukong-hadoop/driver/hadoop_invocation.rb', line 148 def hadoop_files args.find_all { |arg| arg.to_s =~ /\.rb$/ }.each do |arg| settings[:files] << arg end [].tap do || { :files => '-files ', :jars => '-libjars ', :archives => '-archives ' }.each_pair do |file_type_name, file_option_name| unless settings[file_type_name].nil? || settings[file_type_name].empty? files = settings[file_type_name].map do |file_name_or_glob| # Don't glob on the HDFS file_type_name == :archives ? file_name_or_glob : [Dir[file_name_or_glob], file_name_or_glob] end.flatten.compact.uniq.join(',') << "#{file_option_name}'#{files}'" end end end end |
#hadoop_jobconf_options ⇒ Array<String>
Return an array of jobconf (-D) options that will be passed to Hadoop.
Translates the “friendly” wu-hadoop
names into the less-friendly Hadoop names.
101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 |
# File 'lib/wukong-hadoop/driver/hadoop_invocation.rb', line 101 def = [] settings[:reuse_jvms] = '-1' if (settings[:reuse_jvms] == true) settings[:respect_exit_status] = 'false' if (settings[:ignore_exit_status] == true) # If no reducer and no reduce_command, then skip the reduce phase settings[:reduce_tasks] ||= 0 unless reduce? # Fields hadoop should use to distribute records to reducers unless settings[:partition_fields].blank? += [jobconf(:partition_fields), jobconf(:output_field_separator)] end += [ :io_sort_mb, :io_sort_record_percent, :map_speculative, :map_tasks, :max_maps_per_cluster, :max_maps_per_node, :max_node_map_tasks, :max_node_reduce_tasks, :max_reduces_per_cluster, :max_reduces_per_node, :max_record_length, :min_split_size, :output_field_separator, :key_field_separator, :partition_fields, :sort_fields, :reduce_tasks, :respect_exit_status, :reuse_jvms, :timeout, :max_tracker_failures, :max_map_attempts, :max_reduce_attempts ].map do |opt| defn = settings.definition_of(opt, :description) val = settings[opt] java_opt(defn, val) end .flatten.compact end |
#hadoop_other_args ⇒ String
Returns other arguments used by Hadoop streaming.
135 136 137 138 139 140 141 142 143 |
# File 'lib/wukong-hadoop/driver/hadoop_invocation.rb', line 135 def hadoop_other_args extra_str_args = parsed_java_opts if settings[:split_on_xml_tag] extra_str_args << %Q{-inputreader 'StreamXmlRecordReader,begin=<#{.split_on_xml_tag}>,end=</#{.split_on_xml_tag}>'} end extra_str_args << ' -lazyOutput' if settings[:noempty] # don't create reduce file if no records extra_str_args << ' -partitioner org.apache.hadoop.mapred.lib.KeyFieldBasedPartitioner' unless settings[:partition_fields].blank? extra_str_args end |
#hadoop_recycle_env ⇒ Object
:nodoc:
181 182 183 184 |
# File 'lib/wukong-hadoop/driver/hadoop_invocation.rb', line 181 def hadoop_recycle_env use_alternative_gemfile if settings[:gemfile] %w[BUNDLE_GEMFILE].map{ |var| %Q{-cmdenv '#{var}=#{ENV[var]}'} if ENV[var] }.compact end |
#hadoop_runner ⇒ String
The name of the Hadoop binary to use.
Respects the value of --hadoop_runner
if given.
91 92 93 |
# File 'lib/wukong-hadoop/driver/hadoop_invocation.rb', line 91 def hadoop_runner settings[:hadoop_runner] || File.join(settings[:hadoop_home], 'bin/hadoop') end |
#input_format ⇒ String
The input format to use.
Respects the value of --input_format
.
66 67 68 |
# File 'lib/wukong-hadoop/driver/hadoop_invocation.rb', line 66 def input_format settings[:input_format] end |
#io_formats ⇒ Object
:nodoc:
80 81 82 83 84 |
# File 'lib/wukong-hadoop/driver/hadoop_invocation.rb', line 80 def io_formats input = "-inputformat '#{input_format}'" if input_format output = "-outputformat '#{output_format}'" if output_format [input, output] end |
#java_opt(option, value) ⇒ Object
:nodoc:
194 195 196 |
# File 'lib/wukong-hadoop/driver/hadoop_invocation.rb', line 194 def java_opt option, value "-D %s=%s" % [option, Shellwords.escape(value.to_s)] if value end |
#job_name ⇒ String
The job name that will be passed to Hadoop.
Respects the --job_name
option if given, otherwise constructs one from the given processors, input, and output paths.
55 56 57 58 59 |
# File 'lib/wukong-hadoop/driver/hadoop_invocation.rb', line 55 def job_name return settings[:job_name] if settings[:job_name] relevant_filename = args.compact.uniq.map { |path| File.basename(path, '.rb') }.join('-') "#{relevant_filename}---#{input_paths}---#{output_path}".gsub(%r{[^\w/\.\-\+]+}, '') end |
#output_format ⇒ String
The output format to use.
Respects the value of --output_format
.
75 76 77 |
# File 'lib/wukong-hadoop/driver/hadoop_invocation.rb', line 75 def output_format settings[:output_format] end |
#parsed_java_opts ⇒ Object
:nodoc:
187 188 189 190 191 |
# File 'lib/wukong-hadoop/driver/hadoop_invocation.rb', line 187 def parsed_java_opts settings[:java_opts].map do |java_opt| java_opt.split('-D').reject{ |opt| opt.blank? }.map{ |opt| '-D ' + opt.strip } end.flatten end |
#remove_output_path! ⇒ Object
Remove the output path.
Will not actually do anything if the --dry_run
option is also given.
18 19 20 21 22 |
# File 'lib/wukong-hadoop/driver/hadoop_invocation.rb', line 18 def remove_output_path! cmd = %Q{#{hadoop_runner} fs -rmr '#{output_path}'} log.info "Removing output file #{output_path}: #{cmd}" puts `#{cmd}` unless settings[:dry_run] end |
#ruby_interpreter_path ⇒ Object
:nodoc:
171 172 173 |
# File 'lib/wukong-hadoop/driver/hadoop_invocation.rb', line 171 def ruby_interpreter_path Pathname.new(File.join(Config::CONFIG['bindir'], Config::CONFIG['RUBY_INSTALL_NAME'] + Config::CONFIG['EXEEXT'])).realpath end |
#use_alternative_gemfile ⇒ Object
:nodoc:
176 177 178 |
# File 'lib/wukong-hadoop/driver/hadoop_invocation.rb', line 176 def use_alternative_gemfile ENV['BUNDLE_GEMFILE'] = settings[:gemfile] end |