Class: Swineherd::HadoopFileSystem
- Inherits:
-
Object
- Object
- Swineherd::HadoopFileSystem
- Includes:
- BaseFileSystem
- Defined in:
- lib/swineherd/filesystem/hadoopfilesystem.rb
Overview
Methods for dealing with hadoop distributed file system (hdfs). This class requires that you run with JRuby as it makes use of the native java hadoop libraries.
Defined Under Namespace
Classes: HadoopFile
Instance Attribute Summary collapse
-
#conf ⇒ Object
Returns the value of attribute conf.
-
#hdfs ⇒ Object
Returns the value of attribute hdfs.
Instance Method Summary collapse
-
#bzip(input, output) ⇒ Object
BZIP.
-
#check_and_set_environment ⇒ Object
Make sure environment is sane then set up environment for use.
-
#check_env ⇒ Object
Check that we are running with jruby, check for hadoop home.
- #close(*args) ⇒ Object
-
#copy_from_local(srcfile, dstfile) ⇒ Object
Copyy local file to hdfs filesystem.
-
#copy_to_local(srcfile, dstfile) ⇒ Object
Copy hdfs file to local filesystem.
- #cp(srcpath, dstpath) ⇒ Object
- #entries(dirpath) ⇒ Object
- #exists?(path) ⇒ Boolean
-
#initialize(*args) ⇒ HadoopFileSystem
constructor
Initialize a new hadoop file system, needs path to hadoop configuration.
-
#merge(srcdir, dstfile) ⇒ Object
Merge all part files in a directory into one file.
- #mkpath(path) ⇒ Object
- #mv(srcpath, dstpath) ⇒ Object
- #open(path, mode = "r", &blk) ⇒ Object
- #rm(path) ⇒ Object
-
#set_env ⇒ Object
Place hadoop jars in class path, require appropriate jars, set hadoop conf.
-
#stream(input, output) ⇒ Object
This is hackety.
- #type(path) ⇒ Object
Methods included from BaseFileSystem
Constructor Details
#initialize(*args) ⇒ HadoopFileSystem
Initialize a new hadoop file system, needs path to hadoop configuration
17 18 19 20 21 |
# File 'lib/swineherd/filesystem/hadoopfilesystem.rb', line 17 def initialize *args check_and_set_environment @conf = Java::org.apache.hadoop.conf.Configuration.new @hdfs = Java::org.apache.hadoop.fs.FileSystem.get(@conf) end |
Instance Attribute Details
#conf ⇒ Object
Returns the value of attribute conf.
12 13 14 |
# File 'lib/swineherd/filesystem/hadoopfilesystem.rb', line 12 def conf @conf end |
#hdfs ⇒ Object
Returns the value of attribute hdfs.
12 13 14 |
# File 'lib/swineherd/filesystem/hadoopfilesystem.rb', line 12 def hdfs @hdfs end |
Instance Method Details
#bzip(input, output) ⇒ Object
BZIP
106 107 108 109 110 111 112 113 114 115 116 |
# File 'lib/swineherd/filesystem/hadoopfilesystem.rb', line 106 def bzip input, output system("#{@hadoop_home}/bin/hadoop \\ jar #{@hadoop_home}/contrib/streaming/hadoop-*streaming*.jar \\ -D mapred.output.compress=true \\ -D mapred.output.compression.codec=org.apache.hadoop.io.compress.BZip2Codec \\ -D mapred.reduce.tasks=1 \\ -mapper \"/bin/cat\" \\ -reducer \"/bin/cat\" \\ -input \"#{input}\" \\ -output \"#{output}\"") end |
#check_and_set_environment ⇒ Object
Make sure environment is sane then set up environment for use
26 27 28 29 |
# File 'lib/swineherd/filesystem/hadoopfilesystem.rb', line 26 def check_and_set_environment check_env set_env end |
#check_env ⇒ Object
Check that we are running with jruby, check for hadoop home. hadoop_home is preferentially set to the HADOOP_HOME environment variable if it’s set, ‘/usr/local/share/hadoop’ if HADOOP_HOME isn’t defined, and ‘/usr/lib/hadoop’ if ‘/usr/local/share/hadoop’ doesn’t exist. If all else fails inform the user that HADOOP_HOME really should be set.
229 230 231 232 233 234 235 236 237 238 |
# File 'lib/swineherd/filesystem/hadoopfilesystem.rb', line 229 def check_env begin require 'java' rescue LoadError => e raise "\nJava not found, are you sure you're running with JRuby?\n" + e. end @hadoop_home = (ENV['HADOOP_HOME'] || '/usr/local/share/hadoop') @hadoop_home = '/usr/lib/hadoop' unless File.exist? @hadoop_home raise "\nHadoop installation not found, try setting HADOOP_HOME\n" unless File.exist? @hadoop_home end |
#close(*args) ⇒ Object
132 133 134 |
# File 'lib/swineherd/filesystem/hadoopfilesystem.rb', line 132 def close *args @hdfs.close end |
#copy_from_local(srcfile, dstfile) ⇒ Object
Copyy local file to hdfs filesystem
128 129 130 |
# File 'lib/swineherd/filesystem/hadoopfilesystem.rb', line 128 def copy_from_local srcfile, dstfile @hdfs.copy_from_local_file(Path.new(srcfile), Path.new(dstfile)) end |
#copy_to_local(srcfile, dstfile) ⇒ Object
Copy hdfs file to local filesystem
121 122 123 |
# File 'lib/swineherd/filesystem/hadoopfilesystem.rb', line 121 def copy_to_local srcfile, dstfile @hdfs.copy_to_local_file(Path.new(srcfile), Path.new(dstfile)) end |
#cp(srcpath, dstpath) ⇒ Object
49 50 51 |
# File 'lib/swineherd/filesystem/hadoopfilesystem.rb', line 49 def cp srcpath, dstpath FileUtil.copy(@hdfs, Path.new(srcpath), @hdfs, Path.new(dstpath), false, @conf) end |
#entries(dirpath) ⇒ Object
73 74 75 76 77 |
# File 'lib/swineherd/filesystem/hadoopfilesystem.rb', line 73 def entries dirpath return unless type(dirpath) == "directory" list = @hdfs.list_status(Path.new(dirpath)) list.map{|path| path.get_path.to_s} rescue [] end |
#exists?(path) ⇒ Boolean
41 42 43 |
# File 'lib/swineherd/filesystem/hadoopfilesystem.rb', line 41 def exists? path @hdfs.exists(Path.new(path)) end |
#merge(srcdir, dstfile) ⇒ Object
Merge all part files in a directory into one file.
82 83 84 |
# File 'lib/swineherd/filesystem/hadoopfilesystem.rb', line 82 def merge srcdir, dstfile FileUtil.copy_merge(@hdfs, Path.new(srcdir), @hdfs, Path.new(dstfile), false, @conf, "") end |
#mkpath(path) ⇒ Object
53 54 55 56 |
# File 'lib/swineherd/filesystem/hadoopfilesystem.rb', line 53 def mkpath path @hdfs.mkdirs(Path.new(path)) path end |
#mv(srcpath, dstpath) ⇒ Object
45 46 47 |
# File 'lib/swineherd/filesystem/hadoopfilesystem.rb', line 45 def mv srcpath, dstpath @hdfs.rename(Path.new(srcpath), Path.new(dstpath)) end |
#open(path, mode = "r", &blk) ⇒ Object
31 32 33 |
# File 'lib/swineherd/filesystem/hadoopfilesystem.rb', line 31 def open path, mode="r", &blk HadoopFile.new(path,mode,self,&blk) end |
#rm(path) ⇒ Object
36 37 38 39 |
# File 'lib/swineherd/filesystem/hadoopfilesystem.rb', line 36 def rm path @hdfs.delete(Path.new(path), true) [path] end |
#set_env ⇒ Object
Place hadoop jars in class path, require appropriate jars, set hadoop conf
243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 |
# File 'lib/swineherd/filesystem/hadoopfilesystem.rb', line 243 def set_env require 'java' @hadoop_conf = (ENV['HADOOP_CONF_DIR'] || File.join(@hadoop_home, 'conf')) @hadoop_conf += "/" unless @hadoop_conf.end_with? "/" $CLASSPATH << @hadoop_conf Dir["#{@hadoop_home}/hadoop*.jar", "#{@hadoop_home}/lib/*.jar"].each{|jar| require jar} java_import 'org.apache.hadoop.conf.Configuration' java_import 'org.apache.hadoop.fs.Path' java_import 'org.apache.hadoop.fs.FileSystem' java_import 'org.apache.hadoop.fs.FileUtil' java_import 'org.apache.hadoop.mapreduce.lib.input.FileInputFormat' java_import 'org.apache.hadoop.mapreduce.lib.output.FileOutputFormat' java_import 'org.apache.hadoop.fs.FSDataOutputStream' java_import 'org.apache.hadoop.fs.FSDataInputStream' end |
#stream(input, output) ⇒ Object
This is hackety. Use with caution.
89 90 91 92 93 94 95 96 97 98 99 100 101 |
# File 'lib/swineherd/filesystem/hadoopfilesystem.rb', line 89 def stream input, output require 'uri' input_fs_scheme = URI.parse(input).scheme output_fs_scheme = URI.parse(output).scheme system("#{@hadoop_home}/bin/hadoop \\ jar #{@hadoop_home}/contrib/streaming/hadoop-*streaming*.jar \\ -D mapred.job.name=\"Stream { #{input_fs_scheme}(#{File.basename(input)}) -> #{output_fs_scheme}(#{File.basename(output)}) }\" \\ -D mapred.min.split.size=1000000000 \\ -D mapred.reduce.tasks=0 \\ -mapper \"/bin/cat\" \\ -input \"#{input}\" \\ -output \"#{output}\"") end |
#type(path) ⇒ Object
58 59 60 61 62 63 64 65 66 67 68 69 70 71 |
# File 'lib/swineherd/filesystem/hadoopfilesystem.rb', line 58 def type path return "unknown" unless exists? path status = @hdfs.get_file_status(Path.new(path)) return "directory" if status.is_dir? "file" # case # when status.isFile then # return "file" # when status.is_directory? then # return "directory" # when status.is_symlink? then # return "symlink" # end end |