Class: Swineherd::HadoopFileSystem

Inherits:
Object
  • Object
show all
Includes:
BaseFileSystem
Defined in:
lib/swineherd/filesystem/hadoopfilesystem.rb

Overview

Methods for dealing with hadoop distributed file system (hdfs). This class requires that you run with JRuby as it makes use of the native java hadoop libraries.

Defined Under Namespace

Classes: HadoopFile

Instance Attribute Summary collapse

Instance Method Summary collapse

Methods included from BaseFileSystem

#check_paths

Constructor Details

#initialize(*args) ⇒ HadoopFileSystem

Initialize a new hadoop file system, needs path to hadoop configuration



17
18
19
20
21
# File 'lib/swineherd/filesystem/hadoopfilesystem.rb', line 17

def initialize *args
  check_and_set_environment
  @conf = Java::org.apache.hadoop.conf.Configuration.new
  @hdfs = Java::org.apache.hadoop.fs.FileSystem.get(@conf)
end

Instance Attribute Details

#confObject

Returns the value of attribute conf.



12
13
14
# File 'lib/swineherd/filesystem/hadoopfilesystem.rb', line 12

def conf
  @conf
end

#hdfsObject

Returns the value of attribute hdfs.



12
13
14
# File 'lib/swineherd/filesystem/hadoopfilesystem.rb', line 12

def hdfs
  @hdfs
end

Instance Method Details

#bzip(input, output) ⇒ Object

BZIP



106
107
108
109
110
111
112
113
114
115
116
# File 'lib/swineherd/filesystem/hadoopfilesystem.rb', line 106

def bzip input, output
  system("#{@hadoop_home}/bin/hadoop \\
   jar         #{@hadoop_home}/contrib/streaming/hadoop-*streaming*.jar	\\
   -D          mapred.output.compress=true                                  \\
   -D          mapred.output.compression.codec=org.apache.hadoop.io.compress.BZip2Codec  \\
   -D          mapred.reduce.tasks=1                                        \\
   -mapper     \"/bin/cat\"                                                 \\
   -reducer	   \"/bin/cat\"                                                 \\
   -input      \"#{input}\"                                                 \\
   -output     \"#{output}\"")
end

#check_and_set_environmentObject

Make sure environment is sane then set up environment for use



26
27
28
29
# File 'lib/swineherd/filesystem/hadoopfilesystem.rb', line 26

def check_and_set_environment
  check_env
  set_env
end

#check_envObject

Check that we are running with jruby, check for hadoop home. hadoop_home is preferentially set to the HADOOP_HOME environment variable if it’s set, ‘/usr/local/share/hadoop’ if HADOOP_HOME isn’t defined, and ‘/usr/lib/hadoop’ if ‘/usr/local/share/hadoop’ doesn’t exist. If all else fails inform the user that HADOOP_HOME really should be set.



229
230
231
232
233
234
235
236
237
238
# File 'lib/swineherd/filesystem/hadoopfilesystem.rb', line 229

def check_env
  begin
    require 'java'
  rescue LoadError => e
    raise "\nJava not found, are you sure you're running with JRuby?\n" + e.message
  end
  @hadoop_home = (ENV['HADOOP_HOME'] || '/usr/local/share/hadoop')
  @hadoop_home = '/usr/lib/hadoop' unless File.exist? @hadoop_home
  raise "\nHadoop installation not found, try setting HADOOP_HOME\n" unless File.exist? @hadoop_home
end

#close(*args) ⇒ Object



132
133
134
# File 'lib/swineherd/filesystem/hadoopfilesystem.rb', line 132

def close *args
  @hdfs.close
end

#copy_from_local(srcfile, dstfile) ⇒ Object

Copyy local file to hdfs filesystem



128
129
130
# File 'lib/swineherd/filesystem/hadoopfilesystem.rb', line 128

def copy_from_local srcfile, dstfile
  @hdfs.copy_from_local_file(Path.new(srcfile), Path.new(dstfile))
end

#copy_to_local(srcfile, dstfile) ⇒ Object

Copy hdfs file to local filesystem



121
122
123
# File 'lib/swineherd/filesystem/hadoopfilesystem.rb', line 121

def copy_to_local srcfile, dstfile
  @hdfs.copy_to_local_file(Path.new(srcfile), Path.new(dstfile))
end

#cp(srcpath, dstpath) ⇒ Object



49
50
51
# File 'lib/swineherd/filesystem/hadoopfilesystem.rb', line 49

def cp srcpath, dstpath
  FileUtil.copy(@hdfs, Path.new(srcpath), @hdfs, Path.new(dstpath), false, @conf)
end

#entries(dirpath) ⇒ Object



73
74
75
76
77
# File 'lib/swineherd/filesystem/hadoopfilesystem.rb', line 73

def entries dirpath
  return unless type(dirpath) == "directory"
  list = @hdfs.list_status(Path.new(dirpath))
  list.map{|path| path.get_path.to_s} rescue []
end

#exists?(path) ⇒ Boolean

Returns:

  • (Boolean)


41
42
43
# File 'lib/swineherd/filesystem/hadoopfilesystem.rb', line 41

def exists? path
  @hdfs.exists(Path.new(path))
end

#merge(srcdir, dstfile) ⇒ Object

Merge all part files in a directory into one file.



82
83
84
# File 'lib/swineherd/filesystem/hadoopfilesystem.rb', line 82

def merge srcdir, dstfile
  FileUtil.copy_merge(@hdfs, Path.new(srcdir), @hdfs, Path.new(dstfile), false, @conf, "")
end

#mkpath(path) ⇒ Object



53
54
55
56
# File 'lib/swineherd/filesystem/hadoopfilesystem.rb', line 53

def mkpath path
  @hdfs.mkdirs(Path.new(path))
  path
end

#mv(srcpath, dstpath) ⇒ Object



45
46
47
# File 'lib/swineherd/filesystem/hadoopfilesystem.rb', line 45

def mv srcpath, dstpath
  @hdfs.rename(Path.new(srcpath), Path.new(dstpath))
end

#open(path, mode = "r", &blk) ⇒ Object



31
32
33
# File 'lib/swineherd/filesystem/hadoopfilesystem.rb', line 31

def open path, mode="r", &blk
  HadoopFile.new(path,mode,self,&blk)
end

#rm(path) ⇒ Object



36
37
38
39
# File 'lib/swineherd/filesystem/hadoopfilesystem.rb', line 36

def rm path
  @hdfs.delete(Path.new(path), true)
  [path]
end

#set_envObject

Place hadoop jars in class path, require appropriate jars, set hadoop conf



243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
# File 'lib/swineherd/filesystem/hadoopfilesystem.rb', line 243

def set_env
  require 'java'
  @hadoop_conf = (ENV['HADOOP_CONF_DIR'] || File.join(@hadoop_home, 'conf'))
  @hadoop_conf += "/" unless @hadoop_conf.end_with? "/"
  $CLASSPATH << @hadoop_conf 
  Dir["#{@hadoop_home}/hadoop*.jar", "#{@hadoop_home}/lib/*.jar"].each{|jar| require jar}

  java_import 'org.apache.hadoop.conf.Configuration'
  java_import 'org.apache.hadoop.fs.Path'
  java_import 'org.apache.hadoop.fs.FileSystem'
  java_import 'org.apache.hadoop.fs.FileUtil'
  java_import 'org.apache.hadoop.mapreduce.lib.input.FileInputFormat'
  java_import 'org.apache.hadoop.mapreduce.lib.output.FileOutputFormat'
  java_import 'org.apache.hadoop.fs.FSDataOutputStream'
  java_import 'org.apache.hadoop.fs.FSDataInputStream'

end

#stream(input, output) ⇒ Object

This is hackety. Use with caution.



89
90
91
92
93
94
95
96
97
98
99
100
101
# File 'lib/swineherd/filesystem/hadoopfilesystem.rb', line 89

def stream input, output
  require 'uri'
  input_fs_scheme  = URI.parse(input).scheme
  output_fs_scheme = URI.parse(output).scheme
  system("#{@hadoop_home}/bin/hadoop \\
   jar         #{@hadoop_home}/contrib/streaming/hadoop-*streaming*.jar                     \\
   -D          mapred.job.name=\"Stream { #{input_fs_scheme}(#{File.basename(input)}) -> #{output_fs_scheme}(#{File.basename(output)}) }\" \\
   -D          mapred.min.split.size=1000000000                                            \\
   -D          mapred.reduce.tasks=0                                                       \\
   -mapper     \"/bin/cat\"                                                                \\
   -input      \"#{input}\"                                                                \\
   -output     \"#{output}\"")
end

#type(path) ⇒ Object



58
59
60
61
62
63
64
65
66
67
68
69
70
71
# File 'lib/swineherd/filesystem/hadoopfilesystem.rb', line 58

def type path
  return "unknown" unless exists? path
  status = @hdfs.get_file_status(Path.new(path))
  return "directory" if status.is_dir?
  "file"
  # case
  # when status.isFile then
  #   return "file"
  # when status.is_directory? then
  #   return "directory"
  # when status.is_symlink? then
  #   return "symlink"
  # end
end