Class: Swineherd::HadoopFileSystem
- Inherits:
-
Object
- Object
- Swineherd::HadoopFileSystem
- Defined in:
- lib/swineherd-fs/hadoopfilesystem.rb
Overview
Methods for dealing with the Hadoop distributed file system (hdfs). This class requires that you run with JRuby as it makes use of the native Java Hadoop libraries.
Defined Under Namespace
Classes: HadoopFile
Instance Attribute Summary collapse
-
#conf ⇒ Object
Returns the value of attribute conf.
-
#hdfs ⇒ Object
Returns the value of attribute hdfs.
Instance Method Summary collapse
-
#bzip(input, output) ⇒ Object
BZIP.
-
#copy_from_local(srcfile, dstfile) ⇒ Object
Copy local file to hdfs filesystem.
-
#copy_to_local(srcfile, dstfile) ⇒ Object
Copy hdfs file to local filesystem.
-
#cp(srcpath, dstpath) ⇒ Object
supports s3://,s3n://,hdfs:// in @srcpath@ and @dstpath@.
- #cp_r(srcpath, dstpath) ⇒ Object
- #directory?(path) ⇒ Boolean
-
#dist_merge(inputs, output, options = {}) ⇒ Object
Merges many input files into :reduce_tasks amount of output files.
- #exists?(path) ⇒ Boolean
- #file?(path) ⇒ Boolean
-
#initialize(*args) ⇒ HadoopFileSystem
constructor
A new instance of HadoopFileSystem.
- #ls(path) ⇒ Object
-
#ls_r(path) ⇒ Object
list directories recursively, similar to unix ‘ls -R’.
-
#merge(srcdir, dstfile) ⇒ Object
Merge all part files in a directory into one file.
- #mkdir_p(path) ⇒ Object
- #mv(srcpath, dstpath) ⇒ Object
- #open(path, mode = "r", &blk) ⇒ Object
- #rm(path) ⇒ Object
- #rm_r(path) ⇒ Object
- #size(path) ⇒ Object
-
#stream(input, output) ⇒ Object
This is hackety.
Constructor Details
#initialize(*args) ⇒ HadoopFileSystem
Returns a new instance of HadoopFileSystem.
12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 |
# File 'lib/swineherd-fs/hadoopfilesystem.rb', line 12 def initialize *args set_hadoop_environment if running_jruby? @conf = Java::org.apache.hadoop.conf.Configuration.new if Swineherd.config[:aws] @conf.set("fs.s3.awsAccessKeyId",Swineherd.config[:aws][:access_key]) @conf.set("fs.s3.awsSecretAccessKey",Swineherd.config[:aws][:secret_key]) @conf.set("fs.s3n.awsAccessKeyId",Swineherd.config[:aws][:access_key]) @conf.set("fs.s3n.awsSecretAccessKey",Swineherd.config[:aws][:secret_key]) end @hdfs = Java::org.apache.hadoop.fs.FileSystem.get(@conf) end |
Instance Attribute Details
#conf ⇒ Object
Returns the value of attribute conf.
10 11 12 |
# File 'lib/swineherd-fs/hadoopfilesystem.rb', line 10 def conf @conf end |
#hdfs ⇒ Object
Returns the value of attribute hdfs.
10 11 12 |
# File 'lib/swineherd-fs/hadoopfilesystem.rb', line 10 def hdfs @hdfs end |
Instance Method Details
#bzip(input, output) ⇒ Object
BZIP
131 132 133 134 135 136 137 138 139 140 141 |
# File 'lib/swineherd-fs/hadoopfilesystem.rb', line 131 def bzip input, output system("#{@hadoop_home}/bin/hadoop \\ jar #{@hadoop_home}/contrib/streaming/hadoop-*streaming*.jar \\ -D mapred.output.compress=true \\ -D mapred.output.compression.codec=org.apache.hadoop.io.compress.BZip2Codec \\ -D mapred.reduce.tasks=1 \\ -mapper \"/bin/cat\" \\ -reducer \"/bin/cat\" \\ -input \"#{input}\" \\ -output \"#{output}\"") end |
#copy_from_local(srcfile, dstfile) ⇒ Object
Copy local file to hdfs filesystem
99 100 101 |
# File 'lib/swineherd-fs/hadoopfilesystem.rb', line 99 def copy_from_local srcfile, dstfile @hdfs.copy_from_local_file(Path.new(srcfile), Path.new(dstfile)) end |
#copy_to_local(srcfile, dstfile) ⇒ Object
Copy hdfs file to local filesystem
91 92 93 |
# File 'lib/swineherd-fs/hadoopfilesystem.rb', line 91 def copy_to_local srcfile, dstfile @hdfs.copy_to_local_file(Path.new(srcfile), Path.new(dstfile)) end |
#cp(srcpath, dstpath) ⇒ Object
supports s3://,s3n://,hdfs:// in @srcpath@ and @dstpath@
74 75 76 77 78 |
# File 'lib/swineherd-fs/hadoopfilesystem.rb', line 74 def cp srcpath, dstpath @src_fs = Java::org.apache.hadoop.fs.FileSystem.get(Java::JavaNet::URI.create(srcpath),@conf) @dest_fs = Java::org.apache.hadoop.fs.FileSystem.get(Java::JavaNet::URI.create(dstpath),@conf) FileUtil.copy(@src_fs, Path.new(srcpath),@dest_fs, Path.new(dstpath), false, @conf) end |
#cp_r(srcpath, dstpath) ⇒ Object
80 81 82 |
# File 'lib/swineherd-fs/hadoopfilesystem.rb', line 80 def cp_r srcpath,dstpath cp srcpath,dstpath end |
#directory?(path) ⇒ Boolean
61 62 63 |
# File 'lib/swineherd-fs/hadoopfilesystem.rb', line 61 def directory? path exists?(path) && @hdfs.get_file_status(Path.new(path)).is_dir? end |
#dist_merge(inputs, output, options = {}) ⇒ Object
Merges many input files into :reduce_tasks amount of output files
146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 |
# File 'lib/swineherd-fs/hadoopfilesystem.rb', line 146 def dist_merge inputs, output, = {} [:reduce_tasks] ||= 25 [:partition_fields] ||= 2 [:sort_fields] ||= 2 [:field_separator] ||= '/t' names = inputs.map{|inp| File.basename(inp)}.join(',') cmd = "#{@hadoop_home}/bin/hadoop \\ jar #{@hadoop_home}/contrib/streaming/hadoop-*streaming*.jar \\ -D mapred.job.name=\"Swineherd Merge (#{names} -> #{output})\" \\ -D num.key.fields.for.partition=\"#{[:partition_fields]}\" \\ -D stream.num.map.output.key.fields=\"#{[:sort_fields]}\" \\ -D mapred.text.key.partitioner.options=\"-k1,#{[:partition_fields]}\" \\ -D stream.map.output.field.separator=\"'#{[:field_separator]}'\" \\ -D mapred.min.split.size=1000000000 \\ -D mapred.reduce.tasks=#{[:reduce_tasks]} \\ -partitioner org.apache.hadoop.mapred.lib.KeyFieldBasedPartitioner \\ -mapper \"/bin/cat\" \\ -reducer \"/usr/bin/uniq\" \\ -input \"#{inputs.join(',')}\" \\ -output \"#{output}\"" puts cmd system cmd end |
#exists?(path) ⇒ Boolean
57 58 59 |
# File 'lib/swineherd-fs/hadoopfilesystem.rb', line 57 def exists? path @hdfs.exists(Path.new(path)) end |
#file?(path) ⇒ Boolean
65 66 67 |
# File 'lib/swineherd-fs/hadoopfilesystem.rb', line 65 def file? path exists?(path) && @hdfs.isFile(Path.new(path)) end |
#ls(path) ⇒ Object
36 37 38 |
# File 'lib/swineherd-fs/hadoopfilesystem.rb', line 36 def ls path (@hdfs.list_status(Path.new(path)) || []).map{|path| path.get_path.to_s} end |
#ls_r(path) ⇒ Object
list directories recursively, similar to unix ‘ls -R’
41 42 43 |
# File 'lib/swineherd-fs/hadoopfilesystem.rb', line 41 def ls_r path ls(path).inject([]){|rec_paths,path| rec_paths << path; rec_paths << ls(path) unless file?(path); rec_paths}.flatten end |
#merge(srcdir, dstfile) ⇒ Object
Merge all part files in a directory into one file.
108 109 110 |
# File 'lib/swineherd-fs/hadoopfilesystem.rb', line 108 def merge srcdir, dstfile FileUtil.copy_merge(@hdfs, Path.new(srcdir), @hdfs, Path.new(dstfile), false, @conf, "") end |
#mkdir_p(path) ⇒ Object
84 85 86 |
# File 'lib/swineherd-fs/hadoopfilesystem.rb', line 84 def mkdir_p path @hdfs.mkdirs(Path.new(path)) end |
#mv(srcpath, dstpath) ⇒ Object
69 70 71 |
# File 'lib/swineherd-fs/hadoopfilesystem.rb', line 69 def mv srcpath, dstpath @hdfs.rename(Path.new(srcpath), Path.new(dstpath)) end |
#open(path, mode = "r", &blk) ⇒ Object
28 29 30 |
# File 'lib/swineherd-fs/hadoopfilesystem.rb', line 28 def open path, mode="r", &blk HadoopFile.new(path,mode,self,&blk) end |
#rm(path) ⇒ Object
45 46 47 48 49 50 51 |
# File 'lib/swineherd-fs/hadoopfilesystem.rb', line 45 def rm path begin @hdfs.delete(Path.new(path), false) rescue java.io.IOException => e raise Errno::EISDIR, e. end end |
#rm_r(path) ⇒ Object
53 54 55 |
# File 'lib/swineherd-fs/hadoopfilesystem.rb', line 53 def rm_r path @hdfs.delete(Path.new(path), true) end |
#size(path) ⇒ Object
32 33 34 |
# File 'lib/swineherd-fs/hadoopfilesystem.rb', line 32 def size path ls_r(path).inject(0){|sz,filepath| sz += @hdfs.get_file_status(Path.new(filepath)).get_len} end |
#stream(input, output) ⇒ Object
This is hackety. Use with caution.
115 116 117 118 119 120 121 122 123 124 125 126 |
# File 'lib/swineherd-fs/hadoopfilesystem.rb', line 115 def stream input, output input_fs_scheme = (Java::JavaNet::URI.create(input).scheme || "file") + "://" output_fs_scheme = (Java::JavaNet::URI.create(output).scheme || "file") + "://" system("#{@hadoop_home}/bin/hadoop \\ jar #{@hadoop_home}/contrib/streaming/hadoop-*streaming*.jar \\ -D mapred.job.name=\"Stream { #{input_fs_scheme}(#{File.basename(input)}) -> #{output_fs_scheme}(#{File.basename(output)}) }\" \\ -D mapred.min.split.size=1000000000 \\ -D mapred.reduce.tasks=0 \\ -mapper \"/bin/cat\" \\ -input \"#{input}\" \\ -output \"#{output}\"") end |