Class: Wukong::Hadoop::Driver

Inherits:
Driver
  • Object
show all
Includes:
Logging, HadoopInvocation, InputsAndOutputs, LocalInvocation, MapLogic, ReduceLogic
Defined in:
lib/wukong-hadoop/driver.rb

Overview

The Hadoop::Driver class contains the logic to examine arguments and construct command lines which it will execute to create the desired behavior.

The Hadoop::Driver will introspect on its arguments to guess (if not given) the processors to use as mapper and reducer in a map/reduce job. It will also decide whether to run that job in local or Hadoop mode. These decisions result in a command which it will ultimately execute.

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Methods included from LocalInvocation

#cat_input, #cat_output, #local_commandline, #sort_commandline

Methods included from HadoopInvocation

#ensure_input_and_output!, #hadoop_commandline, #hadoop_files, #hadoop_jobconf_options, #hadoop_other_args, #hadoop_recycle_env, #hadoop_runner, #input_format, #io_formats, #java_opt, #job_name, #output_format, #parsed_java_opts, #remove_output_path!, #ruby_interpreter_path, #use_alternative_gemfile

Methods included from ReduceLogic

#explicit_reduce_command?, #explicit_reduce_processor?, #explicit_reducer?, #map_only?, #reduce?, #reducer_arg, #reducer_commandline, #reducer_name, #reducer_needs_run_arg?

Methods included from MapLogic

#explicit_map_command?, #explicit_map_processor?, #explicit_mapper?, #mapper_arg, #mapper_commandline, #mapper_name, #mapper_needs_run_arg?

Methods included from InputsAndOutputs

#input_paths, #output_path

Constructor Details

#initialize(settings, *args) ⇒ Driver

Initialize a new driver with the given settings and args.

Parameters:

  • settings (Configliere::Param)
  • args (Array<String>)


73
74
75
76
# File 'lib/wukong-hadoop/driver.rb', line 73

def initialize(settings, *args)
  @settings = settings
  self.args = args
end

Instance Attribute Details

#argsObject

The (processed) arguments for this driver.

Parameters:

  • (Array<String, Pathname>)


37
38
39
# File 'lib/wukong-hadoop/driver.rb', line 37

def args
  @args
end

#settingsObject

The settings used by this driver.

Parameters:

  • (Configliere::Param)


32
33
34
# File 'lib/wukong-hadoop/driver.rb', line 32

def settings
  @settings
end

Class Method Details

.run(settings, *extra_args) ⇒ Object

Initialize and run a new Wukong::Hadoop::Driver for the given settings.

Will rescue all Wukong::Error exceptions by printing a nice message to STDERR and exiting.

Parameters:

  • settings (Configliere::Param)
  • extra_args (Array<String>)


47
48
49
50
51
52
53
54
# File 'lib/wukong-hadoop/driver.rb', line 47

def self.run(settings, *extra_args)
  begin
    new(settings, *extra_args).run!
  rescue Wukong::Error => e
    $stderr.puts e.message
    exit(127)
  end
end

Instance Method Details

#command_prefixString

The prefix to insert befor all invocations of the wu-local runner.

Returns:

  • (String)


155
156
157
# File 'lib/wukong-hadoop/driver.rb', line 155

def command_prefix
  settings[:command_prefix]
end

#execute_command!(*args) ⇒ Object

Execute a command composed of the given parts.

Will print the command instead of the --dry_run option was given.

Parameters:

  • args (Array<String>)


178
179
180
181
182
183
184
185
186
187
# File 'lib/wukong-hadoop/driver.rb', line 178

def execute_command!(*args)
  command = args.flatten.reject(&:blank?).join(" \\\n    ")
  if settings[:dry_run]
    log.info("Dry run:")
    puts command
  else
    puts `#{command}`
    raise "Streaming command failed!" unless $?.success?
  end
end

#file_is_processor?(path) ⇒ true, false

Does the given path contain a processor named after itself?

Parameters:

  • path (String)

Returns:

  • (true, false)


147
148
149
# File 'lib/wukong-hadoop/driver.rb', line 147

def file_is_processor?(path)
  processor_registered?(processor_name_from_file(path))
end

#mode:hadoop, :local

What mode is this driver in?

Returns:

  • (:hadoop, :local)


109
110
111
# File 'lib/wukong-hadoop/driver.rb', line 109

def mode
  settings[:mode].to_s == 'local' ? :local : :hadoop
end

#params_to_passString

Returns parameters to pass to an invocation of wu-local.

Parameters like --reduce_tasks which are relevant to Wukong-Hadoop will be interpreted and not passed. Others will be passed unmodified.

Returns:

  • (String)


167
168
169
170
# File 'lib/wukong-hadoop/driver.rb', line 167

def params_to_pass
  s = (Wukong.loaded_deploy_pack? ? Deploy.pre_deploy_settings : settings)
  s.reject{ |param, val| s.definition_of(param, :wukong_hadoop) }.map{ |param,val| "--#{param}=#{Shellwords.escape(val.to_s)}" }.join(" ")
end

#processor_name_from_file(path) ⇒ String

Return the guessed name of a processor at the given path.

Parameters:

  • path (String)

Returns:

  • (String)


139
140
141
# File 'lib/wukong-hadoop/driver.rb', line 139

def processor_name_from_file(path)
  File.basename(path, '.rb')
end

#processor_registered?(name) ⇒ true, false

Is there a processor registered with the given name?

Parameters:

  • name (#to_s)

Returns:

  • (true, false)


131
132
133
# File 'lib/wukong-hadoop/driver.rb', line 131

def processor_registered? name
  Wukong.registry.registered?(name.to_s.to_sym)
end

#run!Object

Run this driver.



57
58
59
60
61
62
63
64
65
66
67
# File 'lib/wukong-hadoop/driver.rb', line 57

def run!
  if mode == :local
    # log.info "Launching local!"
    execute_command!(local_commandline)
  else
    ensure_input_and_output!
    remove_output_path! if settings[:rm] || settings[:overwrite]
    log.info "Launching Hadoop!"
    execute_command!(hadoop_commandline)
  end
end

#separate_map_and_reduce_args?true, false

Were mapper and/or reducer named by separate arguments?

Returns:

  • (true, false)


123
124
125
# File 'lib/wukong-hadoop/driver.rb', line 123

def separate_map_and_reduce_args?
  args.size == 2
end

#single_job_arg?true, false

Were mapper and/or reducer named by a single argument?

Returns:

  • (true, false)


116
117
118
# File 'lib/wukong-hadoop/driver.rb', line 116

def single_job_arg?
  args.size == 1
end