Module: Wukong::Hadoop::HadoopInvocation

Included in:
Driver
Defined in:
lib/wukong-hadoop/driver/hadoop_invocation.rb

Overview

Provides methods for executing a map/reduce job on a Hadoop cluster via Hadoop streaming.

Instance Method Summary collapse

Instance Method Details

#ensure_input_and_output!Object

Raise an error unless we have input and output.

Raises:

  • (Error)


10
11
12
# File 'lib/wukong-hadoop/driver/hadoop_invocation.rb', line 10

def ensure_input_and_output!
  raise Error.new("Explicit --input and --output paths are required to run a job in Hadoop mode.") if input_paths.nil? || input_paths.empty? || output_path.nil? || output_path.empty?
end

#hadoop_commandlineString

Return the Hadoop command used to launch this job in a Hadoop cluster.

You should be able to copy, paste, and run this command unmodified when debugging.

Returns:

  • (String)


31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
# File 'lib/wukong-hadoop/driver/hadoop_invocation.rb', line 31

def hadoop_commandline
  [
   hadoop_runner,
   "jar #{settings[:hadoop_home]}/contrib/streaming/hadoop-*streaming*.jar",
   hadoop_jobconf_options,
   "-D mapred.job.name='#{job_name}'",
   hadoop_other_args,
   hadoop_files,
   "-mapper       '#{mapper_commandline}'",
   "-reducer      '#{reducer_commandline}'",
   "-input        '#{input_paths}'",
   "-output       '#{output_path}'",
   io_formats,
   hadoop_recycle_env,
  ].flatten.compact.join(" \t\\\n  ")
end

#hadoop_filesObject



148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
# File 'lib/wukong-hadoop/driver/hadoop_invocation.rb', line 148

def hadoop_files
  args.find_all { |arg| arg.to_s =~ /\.rb$/ }.each do |arg|
    settings[:files] << arg
  end

  [].tap do |files_options|
    {
      :files    => '-files        ',
      :jars     => '-libjars      ',
      :archives => '-archives     '
    }.each_pair do |file_type_name, file_option_name|
      unless settings[file_type_name].nil? || settings[file_type_name].empty?
        files = settings[file_type_name].map do |file_name_or_glob|
          # Don't glob on the HDFS
          file_type_name == :archives ? file_name_or_glob : [Dir[file_name_or_glob], file_name_or_glob]
        end.flatten.compact.uniq.join(',')
        files_options << "#{file_option_name}'#{files}'"
      end
    end
  end
end

#hadoop_jobconf_optionsArray<String>

Return an array of jobconf (-D) options that will be passed to Hadoop.

Translates the “friendly” wu-hadoop names into the less-friendly Hadoop names.

Returns:

  • (Array<String>)


101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
# File 'lib/wukong-hadoop/driver/hadoop_invocation.rb', line 101

def hadoop_jobconf_options
  jobconf_options = []
  settings[:reuse_jvms]          = '-1'    if     (settings[:reuse_jvms] == true)
  settings[:respect_exit_status] = 'false' if     (settings[:ignore_exit_status] == true)
  # If no reducer and no reduce_command, then skip the reduce phase
  settings[:reduce_tasks]      ||= 0       unless reduce?
  # Fields hadoop should use to distribute records to reducers
  unless settings[:partition_fields].blank?
    jobconf_options += [jobconf(:partition_fields), jobconf(:output_field_separator)]
  end
  jobconf_options += [
                      :io_sort_mb,               :io_sort_record_percent,
                      :map_speculative,          :map_tasks,
                      :max_maps_per_cluster,     :max_maps_per_node,
                      :max_node_map_tasks,       :max_node_reduce_tasks,
                      :max_reduces_per_cluster,  :max_reduces_per_node,
                      :max_record_length,        :min_split_size,
                      :output_field_separator,   :key_field_separator,
                      :partition_fields,         :sort_fields,
                      :reduce_tasks,             :respect_exit_status,
                      :reuse_jvms,               :timeout,
                      :max_tracker_failures,     :max_map_attempts,
                      :max_reduce_attempts
                     ].map do |opt|
    defn = settings.definition_of(opt, :description)
    val  = settings[opt]
    java_opt(defn, val)
  end
  jobconf_options.flatten.compact
end

#hadoop_other_argsString

Returns other arguments used by Hadoop streaming.

Returns:

  • (String)


135
136
137
138
139
140
141
142
143
# File 'lib/wukong-hadoop/driver/hadoop_invocation.rb', line 135

def hadoop_other_args
  extra_str_args  = parsed_java_opts
  if settings[:split_on_xml_tag]
    extra_str_args << %Q{-inputreader 'StreamXmlRecordReader,begin=<#{options.split_on_xml_tag}>,end=</#{options.split_on_xml_tag}>'}
  end
  extra_str_args   << ' -lazyOutput' if settings[:noempty]  # don't create reduce file if no records
  extra_str_args   << ' -partitioner org.apache.hadoop.mapred.lib.KeyFieldBasedPartitioner' unless settings[:partition_fields].blank?
  extra_str_args
end

#hadoop_recycle_envObject

:nodoc:



181
182
183
184
# File 'lib/wukong-hadoop/driver/hadoop_invocation.rb', line 181

def hadoop_recycle_env
  use_alternative_gemfile if settings[:gemfile]
  %w[BUNDLE_GEMFILE].map{ |var| %Q{-cmdenv       '#{var}=#{ENV[var]}'} if ENV[var] }.compact
end

#hadoop_runnerString

The name of the Hadoop binary to use.

Respects the value of --hadoop_runner if given.

Returns:

  • (String)


91
92
93
# File 'lib/wukong-hadoop/driver/hadoop_invocation.rb', line 91

def hadoop_runner
  settings[:hadoop_runner] || File.join(settings[:hadoop_home], 'bin/hadoop')
end

#input_formatString

The input format to use.

Respects the value of --input_format.

Returns:

  • (String)


66
67
68
# File 'lib/wukong-hadoop/driver/hadoop_invocation.rb', line 66

def input_format
  settings[:input_format]
end

#io_formatsObject

:nodoc:



80
81
82
83
84
# File 'lib/wukong-hadoop/driver/hadoop_invocation.rb', line 80

def io_formats
  input  = "-inputformat  '#{input_format}'"  if input_format
  output = "-outputformat '#{output_format}'" if output_format
  [input, output]
end

#java_opt(option, value) ⇒ Object

:nodoc:



194
195
196
# File 'lib/wukong-hadoop/driver/hadoop_invocation.rb', line 194

def java_opt option, value
  "-D %s=%s" % [option, Shellwords.escape(value.to_s)] if value
end

#job_nameString

The job name that will be passed to Hadoop.

Respects the --job_name option if given, otherwise constructs one from the given processors, input, and output paths.

Returns:

  • (String)


55
56
57
58
59
# File 'lib/wukong-hadoop/driver/hadoop_invocation.rb', line 55

def job_name
  return settings[:job_name] if settings[:job_name]
  relevant_filename = args.compact.uniq.map { |path| File.basename(path, '.rb') }.join('-')
  "#{relevant_filename}---#{input_paths}---#{output_path}".gsub(%r{[^\w/\.\-\+]+}, '')
end

#output_formatString

The output format to use.

Respects the value of --output_format.

Returns:

  • (String)


75
76
77
# File 'lib/wukong-hadoop/driver/hadoop_invocation.rb', line 75

def output_format
  settings[:output_format]
end

#parsed_java_optsObject

:nodoc:



187
188
189
190
191
# File 'lib/wukong-hadoop/driver/hadoop_invocation.rb', line 187

def parsed_java_opts
  settings[:java_opts].map do |java_opt|
    java_opt.split('-D').reject{ |opt| opt.blank? }.map{ |opt| '-D ' + opt.strip }
  end.flatten
end

#remove_output_path!Object

Remove the output path.

Will not actually do anything if the --dry_run option is also given.



18
19
20
21
22
# File 'lib/wukong-hadoop/driver/hadoop_invocation.rb', line 18

def remove_output_path!
  cmd = %Q{#{hadoop_runner} fs -rmr '#{output_path}'}
  log.info "Removing output file #{output_path}: #{cmd}"
  puts `#{cmd}` unless settings[:dry_run]
end

#ruby_interpreter_pathObject

:nodoc:



171
172
173
# File 'lib/wukong-hadoop/driver/hadoop_invocation.rb', line 171

def ruby_interpreter_path
  Pathname.new(File.join(Config::CONFIG['bindir'], Config::CONFIG['RUBY_INSTALL_NAME'] + Config::CONFIG['EXEEXT'])).realpath
end

#use_alternative_gemfileObject

:nodoc:



176
177
178
# File 'lib/wukong-hadoop/driver/hadoop_invocation.rb', line 176

def use_alternative_gemfile
  ENV['BUNDLE_GEMFILE'] = settings[:gemfile]
end