Class: Swineherd::HadoopFileSystem

Inherits:
Object
  • Object
show all
Defined in:
lib/swineherd-fs/hadoopfilesystem.rb

Overview

Methods for dealing with the Hadoop distributed file system (hdfs). This class requires that you run with JRuby as it makes use of the native Java Hadoop libraries.

Defined Under Namespace

Classes: HadoopFile

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(*args) ⇒ HadoopFileSystem

Returns a new instance of HadoopFileSystem.



12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
# File 'lib/swineherd-fs/hadoopfilesystem.rb', line 12

def initialize *args
  set_hadoop_environment if running_jruby?

  @conf = Java::org.apache.hadoop.conf.Configuration.new

  if Swineherd.config[:aws]
    @conf.set("fs.s3.awsAccessKeyId",Swineherd.config[:aws][:access_key])
    @conf.set("fs.s3.awsSecretAccessKey",Swineherd.config[:aws][:secret_key])

    @conf.set("fs.s3n.awsAccessKeyId",Swineherd.config[:aws][:access_key])
    @conf.set("fs.s3n.awsSecretAccessKey",Swineherd.config[:aws][:secret_key])
  end

  @hdfs = Java::org.apache.hadoop.fs.FileSystem.get(@conf)
end

Instance Attribute Details

#confObject

Returns the value of attribute conf.



10
11
12
# File 'lib/swineherd-fs/hadoopfilesystem.rb', line 10

def conf
  @conf
end

#hdfsObject

Returns the value of attribute hdfs.



10
11
12
# File 'lib/swineherd-fs/hadoopfilesystem.rb', line 10

def hdfs
  @hdfs
end

Instance Method Details

#bzip(input, output) ⇒ Object

BZIP



131
132
133
134
135
136
137
138
139
140
141
# File 'lib/swineherd-fs/hadoopfilesystem.rb', line 131

def bzip input, output
  system("#{@hadoop_home}/bin/hadoop \\
   jar         #{@hadoop_home}/contrib/streaming/hadoop-*streaming*.jar     \\
   -D          mapred.output.compress=true                                  \\
   -D          mapred.output.compression.codec=org.apache.hadoop.io.compress.BZip2Codec  \\
   -D          mapred.reduce.tasks=1                                        \\
   -mapper     \"/bin/cat\"                                                 \\
   -reducer    \"/bin/cat\"                                                 \\
   -input      \"#{input}\"                                                 \\
   -output     \"#{output}\"")
end

#copy_from_local(srcfile, dstfile) ⇒ Object

Copy local file to hdfs filesystem



99
100
101
# File 'lib/swineherd-fs/hadoopfilesystem.rb', line 99

def copy_from_local srcfile, dstfile
  @hdfs.copy_from_local_file(Path.new(srcfile), Path.new(dstfile))
end

#copy_to_local(srcfile, dstfile) ⇒ Object

Copy hdfs file to local filesystem



91
92
93
# File 'lib/swineherd-fs/hadoopfilesystem.rb', line 91

def copy_to_local srcfile, dstfile
  @hdfs.copy_to_local_file(Path.new(srcfile), Path.new(dstfile))
end

#cp(srcpath, dstpath) ⇒ Object

supports s3://,s3n://,hdfs:// in @srcpath@ and @dstpath@



74
75
76
77
78
# File 'lib/swineherd-fs/hadoopfilesystem.rb', line 74

def cp srcpath, dstpath
  @src_fs  = Java::org.apache.hadoop.fs.FileSystem.get(Java::JavaNet::URI.create(srcpath),@conf)
  @dest_fs = Java::org.apache.hadoop.fs.FileSystem.get(Java::JavaNet::URI.create(dstpath),@conf)
  FileUtil.copy(@src_fs, Path.new(srcpath),@dest_fs, Path.new(dstpath), false, @conf)
end

#cp_r(srcpath, dstpath) ⇒ Object



80
81
82
# File 'lib/swineherd-fs/hadoopfilesystem.rb', line 80

def cp_r srcpath,dstpath
  cp srcpath,dstpath
end

#directory?(path) ⇒ Boolean

Returns:

  • (Boolean)


61
62
63
# File 'lib/swineherd-fs/hadoopfilesystem.rb', line 61

def directory? path
  exists?(path) && @hdfs.get_file_status(Path.new(path)).is_dir?
end

#dist_merge(inputs, output, options = {}) ⇒ Object

Merges many input files into :reduce_tasks amount of output files



146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
# File 'lib/swineherd-fs/hadoopfilesystem.rb', line 146

def dist_merge inputs, output, options = {}
  options[:reduce_tasks]     ||= 25
  options[:partition_fields] ||= 2
  options[:sort_fields]      ||= 2
  options[:field_separator]  ||= '/t'
  names = inputs.map{|inp| File.basename(inp)}.join(',')
  cmd   = "#{@hadoop_home}/bin/hadoop \\
   jar         #{@hadoop_home}/contrib/streaming/hadoop-*streaming*.jar                   \\
   -D          mapred.job.name=\"Swineherd Merge (#{names} -> #{output})\"               \\
   -D          num.key.fields.for.partition=\"#{options[:partition_fields]}\"            \\
   -D          stream.num.map.output.key.fields=\"#{options[:sort_fields]}\"             \\
   -D          mapred.text.key.partitioner.options=\"-k1,#{options[:partition_fields]}\" \\
   -D          stream.map.output.field.separator=\"'#{options[:field_separator]}'\"      \\
   -D          mapred.min.split.size=1000000000                                          \\
   -D          mapred.reduce.tasks=#{options[:reduce_tasks]}                             \\
   -partitioner org.apache.hadoop.mapred.lib.KeyFieldBasedPartitioner                    \\
   -mapper     \"/bin/cat\"                                                              \\
   -reducer    \"/usr/bin/uniq\"                                                         \\
   -input      \"#{inputs.join(',')}\"                                                   \\
   -output     \"#{output}\""
  puts cmd
  system cmd
end

#exists?(path) ⇒ Boolean

Returns:

  • (Boolean)


57
58
59
# File 'lib/swineherd-fs/hadoopfilesystem.rb', line 57

def exists? path
  @hdfs.exists(Path.new(path))
end

#file?(path) ⇒ Boolean

Returns:

  • (Boolean)


65
66
67
# File 'lib/swineherd-fs/hadoopfilesystem.rb', line 65

def file? path
  exists?(path) && @hdfs.isFile(Path.new(path))
end

#ls(path) ⇒ Object



36
37
38
# File 'lib/swineherd-fs/hadoopfilesystem.rb', line 36

def ls path
  (@hdfs.list_status(Path.new(path)) || []).map{|path| path.get_path.to_s}
end

#ls_r(path) ⇒ Object

list directories recursively, similar to unix ‘ls -R’



41
42
43
# File 'lib/swineherd-fs/hadoopfilesystem.rb', line 41

def ls_r path
  ls(path).inject([]){|rec_paths,path| rec_paths << path; rec_paths << ls(path) unless file?(path); rec_paths}.flatten
end

#merge(srcdir, dstfile) ⇒ Object

Merge all part files in a directory into one file.



108
109
110
# File 'lib/swineherd-fs/hadoopfilesystem.rb', line 108

def merge srcdir, dstfile
  FileUtil.copy_merge(@hdfs, Path.new(srcdir), @hdfs, Path.new(dstfile), false, @conf, "")
end

#mkdir_p(path) ⇒ Object



84
85
86
# File 'lib/swineherd-fs/hadoopfilesystem.rb', line 84

def mkdir_p path
  @hdfs.mkdirs(Path.new(path))
end

#mv(srcpath, dstpath) ⇒ Object



69
70
71
# File 'lib/swineherd-fs/hadoopfilesystem.rb', line 69

def mv srcpath, dstpath
  @hdfs.rename(Path.new(srcpath), Path.new(dstpath))
end

#open(path, mode = "r", &blk) ⇒ Object



28
29
30
# File 'lib/swineherd-fs/hadoopfilesystem.rb', line 28

def open path, mode="r", &blk
  HadoopFile.new(path,mode,self,&blk)
end

#rm(path) ⇒ Object



45
46
47
48
49
50
51
# File 'lib/swineherd-fs/hadoopfilesystem.rb', line 45

def rm path
  begin
    @hdfs.delete(Path.new(path), false)
  rescue java.io.IOException => e
    raise Errno::EISDIR, e.message
  end
end

#rm_r(path) ⇒ Object



53
54
55
# File 'lib/swineherd-fs/hadoopfilesystem.rb', line 53

def rm_r path
  @hdfs.delete(Path.new(path), true)
end

#size(path) ⇒ Object



32
33
34
# File 'lib/swineherd-fs/hadoopfilesystem.rb', line 32

def size path
  ls_r(path).inject(0){|sz,filepath| sz += @hdfs.get_file_status(Path.new(filepath)).get_len}
end

#stream(input, output) ⇒ Object

This is hackety. Use with caution.



115
116
117
118
119
120
121
122
123
124
125
126
# File 'lib/swineherd-fs/hadoopfilesystem.rb', line 115

def stream input, output
  input_fs_scheme  = (Java::JavaNet::URI.create(input).scheme || "file") + "://"
  output_fs_scheme = (Java::JavaNet::URI.create(output).scheme || "file") + "://"
  system("#{@hadoop_home}/bin/hadoop \\
   jar         #{@hadoop_home}/contrib/streaming/hadoop-*streaming*.jar                     \\
   -D          mapred.job.name=\"Stream { #{input_fs_scheme}(#{File.basename(input)}) -> #{output_fs_scheme}(#{File.basename(output)}) }\" \\
   -D          mapred.min.split.size=1000000000                                            \\
   -D          mapred.reduce.tasks=0                                                       \\
   -mapper     \"/bin/cat\"                                                                \\
   -input      \"#{input}\"                                                                \\
   -output     \"#{output}\"")
end