Class: Swineherd::HadoopFileSystem
- Inherits:
-
Object
- Object
- Swineherd::HadoopFileSystem
- Includes:
- BaseFileSystem
- Defined in:
- lib/swineherd/filesystem/hadoopfilesystem.rb
Overview
Methods for dealing with hadoop distributed file system (hdfs). This class requires that you run with JRuby as it makes use of the native java hadoop libraries.
Defined Under Namespace
Classes: HadoopFile
Instance Attribute Summary collapse
-
#conf ⇒ Object
Returns the value of attribute conf.
-
#hdfs ⇒ Object
Returns the value of attribute hdfs.
Instance Method Summary collapse
-
#bzip(input, output) ⇒ Object
BZIP.
-
#check_and_set_environment ⇒ Object
Make sure environment is sane then set up environment for use.
-
#check_env ⇒ Object
Check that we are running with jruby, check for hadoop home.
- #close(*args) ⇒ Object
-
#copy_from_local(srcfile, dstfile) ⇒ Object
Copy local file to hdfs filesystem.
-
#copy_to_local(srcfile, dstfile) ⇒ Object
Copy hdfs file to local filesystem.
- #cp(srcpath, dstpath) ⇒ Object
-
#dist_merge(inputs, output, options = {}) ⇒ Object
Merges many input files into :reduce_tasks amount of output files.
- #entries(dirpath) ⇒ Object
- #exists?(path) ⇒ Boolean
-
#initialize(params = {}, *args) ⇒ HadoopFileSystem
constructor
Initialize a new hadoop file system, needs path to hadoop configuration.
-
#lr(path) ⇒ Object
Recursively list paths.
-
#merge(srcdir, dstfile) ⇒ Object
Merge all part files in a directory into one file.
- #mkpath(path) ⇒ Object
- #mv(srcpath, dstpath) ⇒ Object
- #open(path, mode = "r", &blk) ⇒ Object
- #rm(path) ⇒ Object
-
#set_env ⇒ Object
Place hadoop jars in class path, require appropriate jars, set hadoop conf.
- #size(path) ⇒ Object
-
#stream(input, output) ⇒ Object
This is hackety.
- #type(path) ⇒ Object
Methods included from BaseFileSystem
Constructor Details
#initialize(params = {}, *args) ⇒ HadoopFileSystem
Initialize a new hadoop file system, needs path to hadoop configuration
17 18 19 20 21 22 23 |
# File 'lib/swineherd/filesystem/hadoopfilesystem.rb', line 17 def initialize params={}, *args check_and_set_environment @conf = Java::org.apache.hadoop.conf.Configuration.new uri = Java::java.net.URI.new params[:filesystem] if params[:filesystem] fs_params = uri ? [ uri, @conf ] : [ @conf ] @hdfs = Java::org.apache.hadoop.fs.FileSystem.get *fs_params end |
Instance Attribute Details
#conf ⇒ Object
Returns the value of attribute conf.
12 13 14 |
# File 'lib/swineherd/filesystem/hadoopfilesystem.rb', line 12 def conf @conf end |
#hdfs ⇒ Object
Returns the value of attribute hdfs.
12 13 14 |
# File 'lib/swineherd/filesystem/hadoopfilesystem.rb', line 12 def hdfs @hdfs end |
Instance Method Details
#bzip(input, output) ⇒ Object
BZIP
123 124 125 126 127 128 129 130 131 132 133 |
# File 'lib/swineherd/filesystem/hadoopfilesystem.rb', line 123 def bzip input, output system("#{@hadoop_home}/bin/hadoop \\ jar #{@hadoop_home}/contrib/streaming/hadoop-*streaming*.jar \\ -D mapred.output.compress=true \\ -D mapred.output.compression.codec=org.apache.hadoop.io.compress.BZip2Codec \\ -D mapred.reduce.tasks=1 \\ -mapper \"/bin/cat\" \\ -reducer \"/bin/cat\" \\ -input \"#{input}\" \\ -output \"#{output}\"") end |
#check_and_set_environment ⇒ Object
Make sure environment is sane then set up environment for use
28 29 30 31 |
# File 'lib/swineherd/filesystem/hadoopfilesystem.rb', line 28 def check_and_set_environment check_env set_env end |
#check_env ⇒ Object
Check that we are running with jruby, check for hadoop home. hadoop_home is preferentially set to the HADOOP_HOME environment variable if it’s set, ‘/usr/local/share/hadoop’ if HADOOP_HOME isn’t defined, and ‘/usr/lib/hadoop’ if ‘/usr/local/share/hadoop’ doesn’t exist. If all else fails inform the user that HADOOP_HOME really should be set.
273 274 275 276 277 278 279 280 281 282 |
# File 'lib/swineherd/filesystem/hadoopfilesystem.rb', line 273 def check_env begin require 'java' rescue LoadError => e raise "\nJava not found, are you sure you're running with JRuby?\n" + e. end @hadoop_home = (ENV['HADOOP_HOME'] || '/usr/local/share/hadoop') @hadoop_home = '/usr/lib/hadoop' unless File.exist? @hadoop_home raise "\nHadoop installation not found, try setting HADOOP_HOME\n" unless File.exist? @hadoop_home end |
#close(*args) ⇒ Object
176 177 178 |
# File 'lib/swineherd/filesystem/hadoopfilesystem.rb', line 176 def close *args @hdfs.close end |
#copy_from_local(srcfile, dstfile) ⇒ Object
Copy local file to hdfs filesystem
172 173 174 |
# File 'lib/swineherd/filesystem/hadoopfilesystem.rb', line 172 def copy_from_local srcfile, dstfile @hdfs.copy_from_local_file(Path.new(srcfile), Path.new(dstfile)) end |
#copy_to_local(srcfile, dstfile) ⇒ Object
Copy hdfs file to local filesystem
165 166 167 |
# File 'lib/swineherd/filesystem/hadoopfilesystem.rb', line 165 def copy_to_local srcfile, dstfile @hdfs.copy_to_local_file(Path.new(srcfile), Path.new(dstfile)) end |
#cp(srcpath, dstpath) ⇒ Object
66 67 68 |
# File 'lib/swineherd/filesystem/hadoopfilesystem.rb', line 66 def cp srcpath, dstpath FileUtil.copy(@hdfs, Path.new(srcpath), @hdfs, Path.new(dstpath), false, @conf) end |
#dist_merge(inputs, output, options = {}) ⇒ Object
Merges many input files into :reduce_tasks amount of output files
138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 |
# File 'lib/swineherd/filesystem/hadoopfilesystem.rb', line 138 def dist_merge inputs, output, = {} [:reduce_tasks] ||= 25 [:partition_fields] ||= 2 [:sort_fields] ||= 2 [:field_separator] ||= '/t' names = inputs.map{|inp| File.basename(inp)}.join(',') cmd = "#{@hadoop_home}/bin/hadoop \\ jar #{@hadoop_home}/contrib/streaming/hadoop-*streaming*.jar \\ -D mapred.job.name=\"Swineherd Merge (#{names} -> #{output})\" \\ -D num.key.fields.for.partition=\"#{[:partition_fields]}\" \\ -D stream.num.map.output.key.fields=\"#{[:sort_fields]}\" \\ -D mapred.text.key.partitioner.options=\"-k1,#{[:partition_fields]}\" \\ -D stream.map.output.field.separator=\"'#{[:field_separator]}'\" \\ -D mapred.min.split.size=1000000000 \\ -D mapred.reduce.tasks=#{[:reduce_tasks]} \\ -partitioner org.apache.hadoop.mapred.lib.KeyFieldBasedPartitioner \\ -mapper \"/bin/cat\" \\ -reducer \"/usr/bin/uniq\" \\ -input \"#{inputs.join(',')}\" \\ -output \"#{output}\"" puts cmd system cmd end |
#entries(dirpath) ⇒ Object
90 91 92 93 94 |
# File 'lib/swineherd/filesystem/hadoopfilesystem.rb', line 90 def entries dirpath return unless type(dirpath) == "directory" list = @hdfs.list_status(Path.new(dirpath)) list.map{|path| path.get_path.to_s} rescue [] end |
#exists?(path) ⇒ Boolean
58 59 60 |
# File 'lib/swineherd/filesystem/hadoopfilesystem.rb', line 58 def exists? path @hdfs.exists(Path.new(path)) end |
#lr(path) ⇒ Object
Recursively list paths
44 45 46 47 48 49 50 51 |
# File 'lib/swineherd/filesystem/hadoopfilesystem.rb', line 44 def lr path paths = entries(path) if (paths && !paths.empty?) paths.map{|e| lr(e)}.flatten else path end end |
#merge(srcdir, dstfile) ⇒ Object
Merge all part files in a directory into one file.
99 100 101 |
# File 'lib/swineherd/filesystem/hadoopfilesystem.rb', line 99 def merge srcdir, dstfile FileUtil.copy_merge(@hdfs, Path.new(srcdir), @hdfs, Path.new(dstfile), false, @conf, "") end |
#mkpath(path) ⇒ Object
70 71 72 73 |
# File 'lib/swineherd/filesystem/hadoopfilesystem.rb', line 70 def mkpath path @hdfs.mkdirs(Path.new(path)) path end |
#mv(srcpath, dstpath) ⇒ Object
62 63 64 |
# File 'lib/swineherd/filesystem/hadoopfilesystem.rb', line 62 def mv srcpath, dstpath @hdfs.rename(Path.new(srcpath), Path.new(dstpath)) end |
#open(path, mode = "r", &blk) ⇒ Object
33 34 35 |
# File 'lib/swineherd/filesystem/hadoopfilesystem.rb', line 33 def open path, mode="r", &blk HadoopFile.new(path,mode,self,&blk) end |
#rm(path) ⇒ Object
53 54 55 56 |
# File 'lib/swineherd/filesystem/hadoopfilesystem.rb', line 53 def rm path @hdfs.delete(Path.new(path), true) [path] end |
#set_env ⇒ Object
Place hadoop jars in class path, require appropriate jars, set hadoop conf
287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 |
# File 'lib/swineherd/filesystem/hadoopfilesystem.rb', line 287 def set_env require 'java' @hadoop_conf = (ENV['HADOOP_CONF_DIR'] || File.join(@hadoop_home, 'conf')) @hadoop_conf += "/" unless @hadoop_conf.end_with? "/" $CLASSPATH << @hadoop_conf Dir["#{@hadoop_home}/hadoop*.jar", "#{@hadoop_home}/lib/*.jar"].each{|jar| require jar} java_import 'org.apache.hadoop.conf.Configuration' java_import 'org.apache.hadoop.fs.Path' java_import 'org.apache.hadoop.fs.FileSystem' java_import 'org.apache.hadoop.fs.FileUtil' java_import 'org.apache.hadoop.mapreduce.lib.input.FileInputFormat' java_import 'org.apache.hadoop.mapreduce.lib.output.FileOutputFormat' java_import 'org.apache.hadoop.fs.FSDataOutputStream' java_import 'org.apache.hadoop.fs.FSDataInputStream' end |
#size(path) ⇒ Object
37 38 39 |
# File 'lib/swineherd/filesystem/hadoopfilesystem.rb', line 37 def size path lr(path).inject(0){|sz, f| sz += @hdfs.get_file_status(Path.new(f)).get_len} end |
#stream(input, output) ⇒ Object
This is hackety. Use with caution.
106 107 108 109 110 111 112 113 114 115 116 117 118 |
# File 'lib/swineherd/filesystem/hadoopfilesystem.rb', line 106 def stream input, output require 'uri' input_fs_scheme = URI.parse(input).scheme output_fs_scheme = URI.parse(output).scheme system("#{@hadoop_home}/bin/hadoop \\ jar #{@hadoop_home}/contrib/streaming/hadoop-*streaming*.jar \\ -D mapred.job.name=\"Stream { #{input_fs_scheme}(#{File.basename(input)}) -> #{output_fs_scheme}(#{File.basename(output)}) }\" \\ -D mapred.min.split.size=1000000000 \\ -D mapred.reduce.tasks=0 \\ -mapper \"/bin/cat\" \\ -input \"#{input}\" \\ -output \"#{output}\"") end |
#type(path) ⇒ Object
75 76 77 78 79 80 81 82 83 84 85 86 87 88 |
# File 'lib/swineherd/filesystem/hadoopfilesystem.rb', line 75 def type path return "unknown" unless exists? path status = @hdfs.get_file_status(Path.new(path)) return "directory" if status.is_dir? "file" # case # when status.isFile then # return "file" # when status.is_directory? then # return "directory" # when status.is_symlink? then # return "symlink" # end end |