Class: Wukong::Hadoop::Driver
- Inherits:
-
Driver
- Object
- Driver
- Wukong::Hadoop::Driver
- Includes:
- Logging, HadoopInvocation, InputsAndOutputs, LocalInvocation, MapLogic, ReduceLogic
- Defined in:
- lib/wukong-hadoop/driver.rb
Overview
The Hadoop::Driver
class contains the logic to examine arguments and construct command lines which it will execute to create the desired behavior.
The Hadoop::Driver will introspect on its arguments to guess (if not given) the processors to use as mapper and reducer in a map/reduce job. It will also decide whether to run that job in local or Hadoop mode. These decisions result in a command which it will ultimately execute.
Instance Attribute Summary collapse
-
#args ⇒ Object
The (processed) arguments for this driver.
-
#settings ⇒ Object
The settings used by this driver.
Class Method Summary collapse
-
.run(settings, *extra_args) ⇒ Object
Initialize and run a new Wukong::Hadoop::Driver for the given
settings
.
Instance Method Summary collapse
-
#command_prefix ⇒ String
The prefix to insert befor all invocations of the
wu-local
runner. -
#execute_command!(*args) ⇒ Object
Execute a command composed of the given parts.
-
#file_is_processor?(path) ⇒ true, false
Does the given
path
contain a processor named after itself?. -
#initialize(settings, *args) ⇒ Driver
constructor
Initialize a new driver with the given
settings
andargs
. -
#mode ⇒ :hadoop, :local
What mode is this driver in?.
-
#params_to_pass ⇒ String
Returns parameters to pass to an invocation of
wu-local
. -
#processor_name_from_file(path) ⇒ String
Return the guessed name of a processor at the given
path
. -
#processor_registered?(name) ⇒ true, false
Is there a processor registered with the given
name
?. -
#run! ⇒ Object
Run this driver.
-
#separate_map_and_reduce_args? ⇒ true, false
Were mapper and/or reducer named by separate arguments?.
-
#single_job_arg? ⇒ true, false
Were mapper and/or reducer named by a single argument?.
Methods included from LocalInvocation
#cat_input, #cat_output, #local_commandline, #sort_commandline
Methods included from HadoopInvocation
#ensure_input_and_output!, #hadoop_commandline, #hadoop_files, #hadoop_jobconf_options, #hadoop_other_args, #hadoop_recycle_env, #hadoop_runner, #input_format, #io_formats, #java_opt, #job_name, #output_format, #parsed_java_opts, #remove_output_path!, #ruby_interpreter_path, #use_alternative_gemfile
Methods included from ReduceLogic
#explicit_reduce_command?, #explicit_reduce_processor?, #explicit_reducer?, #map_only?, #reduce?, #reducer_arg, #reducer_commandline, #reducer_name, #reducer_needs_run_arg?
Methods included from MapLogic
#explicit_map_command?, #explicit_map_processor?, #explicit_mapper?, #mapper_arg, #mapper_commandline, #mapper_name, #mapper_needs_run_arg?
Methods included from InputsAndOutputs
Constructor Details
#initialize(settings, *args) ⇒ Driver
Initialize a new driver with the given settings
and args
.
73 74 75 76 |
# File 'lib/wukong-hadoop/driver.rb', line 73 def initialize(settings, *args) @settings = settings self.args = args end |
Instance Attribute Details
#args ⇒ Object
The (processed) arguments for this driver.
37 38 39 |
# File 'lib/wukong-hadoop/driver.rb', line 37 def args @args end |
#settings ⇒ Object
The settings used by this driver.
32 33 34 |
# File 'lib/wukong-hadoop/driver.rb', line 32 def settings @settings end |
Class Method Details
.run(settings, *extra_args) ⇒ Object
Initialize and run a new Wukong::Hadoop::Driver for the given settings
.
Will rescue all Wukong::Error exceptions by printing a nice message to STDERR and exiting.
47 48 49 50 51 52 53 54 |
# File 'lib/wukong-hadoop/driver.rb', line 47 def self.run(settings, *extra_args) begin new(settings, *extra_args).run! rescue Wukong::Error => e $stderr.puts e. exit(127) end end |
Instance Method Details
#command_prefix ⇒ String
The prefix to insert befor all invocations of the wu-local
runner.
155 156 157 |
# File 'lib/wukong-hadoop/driver.rb', line 155 def command_prefix settings[:command_prefix] end |
#execute_command!(*args) ⇒ Object
Execute a command composed of the given parts.
Will print the command instead of the --dry_run
option was given.
178 179 180 181 182 183 184 185 186 187 |
# File 'lib/wukong-hadoop/driver.rb', line 178 def execute_command!(*args) command = args.flatten.reject(&:blank?).join(" \\\n ") if settings[:dry_run] log.info("Dry run:") puts command else puts `#{command}` raise "Streaming command failed!" unless $?.success? end end |
#file_is_processor?(path) ⇒ true, false
Does the given path
contain a processor named after itself?
147 148 149 |
# File 'lib/wukong-hadoop/driver.rb', line 147 def file_is_processor?(path) processor_registered?(processor_name_from_file(path)) end |
#mode ⇒ :hadoop, :local
What mode is this driver in?
109 110 111 |
# File 'lib/wukong-hadoop/driver.rb', line 109 def mode settings[:mode].to_s == 'local' ? :local : :hadoop end |
#params_to_pass ⇒ String
Returns parameters to pass to an invocation of wu-local
.
Parameters like --reduce_tasks
which are relevant to Wukong-Hadoop will be interpreted and not passed. Others will be passed unmodified.
167 168 169 170 |
# File 'lib/wukong-hadoop/driver.rb', line 167 def params_to_pass s = (Wukong.loaded_deploy_pack? ? Deploy.pre_deploy_settings : settings) s.reject{ |param, val| s.definition_of(param, :wukong_hadoop) }.map{ |param,val| "--#{param}=#{Shellwords.escape(val.to_s)}" }.join(" ") end |
#processor_name_from_file(path) ⇒ String
Return the guessed name of a processor at the given path
.
139 140 141 |
# File 'lib/wukong-hadoop/driver.rb', line 139 def processor_name_from_file(path) File.basename(path, '.rb') end |
#processor_registered?(name) ⇒ true, false
Is there a processor registered with the given name
?
131 132 133 |
# File 'lib/wukong-hadoop/driver.rb', line 131 def processor_registered? name Wukong.registry.registered?(name.to_s.to_sym) end |
#run! ⇒ Object
Run this driver.
57 58 59 60 61 62 63 64 65 66 67 |
# File 'lib/wukong-hadoop/driver.rb', line 57 def run! if mode == :local # log.info "Launching local!" execute_command!(local_commandline) else ensure_input_and_output! remove_output_path! if settings[:rm] || settings[:overwrite] log.info "Launching Hadoop!" execute_command!(hadoop_commandline) end end |
#separate_map_and_reduce_args? ⇒ true, false
Were mapper and/or reducer named by separate arguments?
123 124 125 |
# File 'lib/wukong-hadoop/driver.rb', line 123 def separate_map_and_reduce_args? args.size == 2 end |
#single_job_arg? ⇒ true, false
Were mapper and/or reducer named by a single argument?
116 117 118 |
# File 'lib/wukong-hadoop/driver.rb', line 116 def single_job_arg? args.size == 1 end |