Class: Runner

Inherits:
Object
  • Object
show all
Defined in:
lib/jmapreduce/runner.rb

Constant Summary collapse

JAVA_MAIN_CLASS =
'org.fingertap.jmapreduce.JMapReduce'

Instance Method Summary collapse

Constructor Details

#initialize(script, input, output, opts = {}) ⇒ Runner

Returns a new instance of Runner.



4
5
6
7
8
9
10
11
12
# File 'lib/jmapreduce/runner.rb', line 4

def initialize(script, input, output, opts={})
  @script = script
  @input = input
  @output = output
  @opts = opts
  
  # env get / set and check
  hadoop_home and hadoop_cmd and hadoop_classpath
end

Instance Method Details

#archived_argsObject



65
66
67
68
69
70
71
72
73
74
75
76
77
78
# File 'lib/jmapreduce/runner.rb', line 65

def archived_args
  return unless @opts[:dirs]
  
  archived_files = []
  @opts[:dirs].split(',').each do |dir|
    next unless File.directory?(dir)
    tgz = "/tmp/jmapreduce-#{Process.pid}-#{Time.now.to_i}-#{rand(1000)}.tgz"
    system("cd #{dir} && tar  --exclude .git -czf #{tgz} *")
    archived_files << "#{tgz}\##{File.basename(dir)}"
    @tmp_files << tgz
  end
  
  "-archives #{archived_files.join(',')}"
end

#conf_argsObject



57
58
59
60
61
62
63
# File 'lib/jmapreduce/runner.rb', line 57

def conf_args
  args = ''
  args += @opts[:conf] ? "-conf #{@opts[:conf]} " : ''
  args += @opts[:namenode] ? "-fs #{@opts[:namenode]} " : ''
  args += @opts[:jobtracker] ? "-jt #{@opts[:jobtracker]} " : ''
  args
end

#dirnamesObject



98
99
100
# File 'lib/jmapreduce/runner.rb', line 98

def dirnames
  files.map{ |f| File.dirname(f) }
end

#file_argsObject



53
54
55
# File 'lib/jmapreduce/runner.rb', line 53

def file_args
  "-files #{files.join(',')}"
end

#filesObject



92
93
94
95
96
# File 'lib/jmapreduce/runner.rb', line 92

def files
  ret = [@script]
  ret += @opts[:files].split(',') if @opts[:files]
  ret
end

#hadoop_classpathObject



26
27
28
# File 'lib/jmapreduce/runner.rb', line 26

def hadoop_classpath
  ENV['HADOOP_CLASSPATH'] = ([lib_path] + dirnames + lib_jars).join(':')
end

#hadoop_cmdObject



19
20
21
22
23
24
# File 'lib/jmapreduce/runner.rb', line 19

def hadoop_cmd
  hadoop = `which hadoop 2>/dev/null`
  hadoop = "#{hadoop_home}/bin/hadoop" if hadoop.empty? and (!hadoop_home.empty?)
  raise 'Cannot find hadoop command' if hadoop.empty?
  hadoop.chomp
end

#hadoop_homeObject



14
15
16
17
# File 'lib/jmapreduce/runner.rb', line 14

def hadoop_home
  raise 'Please set HADOOP_HOME'  unless ENV['HADOOP_HOME']
  ENV['HADOOP_HOME']
end

#jars_argsObject



49
50
51
# File 'lib/jmapreduce/runner.rb', line 49

def jars_args
  "-libjars #{lib_jars.join(',')}"
end

#lib_jarsObject



102
103
104
105
106
107
108
109
110
111
112
113
# File 'lib/jmapreduce/runner.rb', line 102

def lib_jars
  jars = [
    JRubyJars.core_jar_path,
    JRubyJars.stdlib_jar_path,
    main_jar_path,
    File.expand_path(File.join(File.dirname(__FILE__), '..', '..', 'vendors', 'gson.jar')),
    File.expand_path(File.join(File.dirname(__FILE__), '..', '..', 'vendors', 'javassist.jar')),
    File.expand_path(File.join(File.dirname(__FILE__), '..', '..', 'vendors', 'msgpack.jar'))
  ]
  jars += @opts[:libjars].split(',') if @opts[:libjars]
  jars
end

#lib_pathObject



119
120
121
# File 'lib/jmapreduce/runner.rb', line 119

def lib_path
  File.expand_path(File.join(File.dirname(__FILE__), '..'))
end

#main_jar_pathObject



115
116
117
# File 'lib/jmapreduce/runner.rb', line 115

def main_jar_path
  File.expand_path(File.join(File.dirname(__FILE__), '..', '..', 'release', 'jmapreduce.jar'))
end

#mapred_argsObject



80
81
82
# File 'lib/jmapreduce/runner.rb', line 80

def mapred_args
  "#{File.basename(@script)} #{@input} #{@output}"
end

#properties_argsObject



84
85
86
87
88
89
90
# File 'lib/jmapreduce/runner.rb', line 84

def properties_args
  return '' if @opts[:properties].nil? && @opts[:json].nil?
  properties = []
  properties << @opts[:properties] if @opts[:properties]
  properties << @opts[:json] if @opts[:json]
  properties.join(',')
end

#runObject



30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
# File 'lib/jmapreduce/runner.rb', line 30

def run
  @tmp_files = []

  cmd = "#{hadoop_cmd} jar #{main_jar_path} #{JAVA_MAIN_CLASS} #{file_args} #{jars_args} #{conf_args} #{archived_args} #{mapred_args} \"#{properties_args}\""
  
  puts(cmd)
  
  r = false
  begin
    r = system(cmd)
  ensure
    # delete temporary files
    @tmp_files.each do | tmp_file |
      File.delete(tmp_file) if File.exists?(tmp_file)
    end
  end
  return r
end