Class: Swineherd::HadoopFileSystem

Inherits:
Object
  • Object
show all
Includes:
BaseFileSystem
Defined in:
lib/swineherd/filesystem/hadoopfilesystem.rb

Overview

Methods for dealing with hadoop distributed file system (hdfs). This class requires that you run with JRuby as it makes use of the native java hadoop libraries.

Defined Under Namespace

Classes: HadoopFile

Instance Attribute Summary collapse

Instance Method Summary collapse

Methods included from BaseFileSystem

#check_paths

Constructor Details

#initialize(params = {}, *args) ⇒ HadoopFileSystem

Initialize a new hadoop file system, needs path to hadoop configuration



17
18
19
20
21
22
23
# File 'lib/swineherd/filesystem/hadoopfilesystem.rb', line 17

def initialize params={}, *args
  check_and_set_environment
  @conf = Java::org.apache.hadoop.conf.Configuration.new
  uri = Java::java.net.URI.new params[:filesystem] if params[:filesystem]
  fs_params = uri ? [ uri, @conf ] : [ @conf ]
  @hdfs = Java::org.apache.hadoop.fs.FileSystem.get *fs_params
end

Instance Attribute Details

#confObject

Returns the value of attribute conf.



12
13
14
# File 'lib/swineherd/filesystem/hadoopfilesystem.rb', line 12

def conf
  @conf
end

#hdfsObject

Returns the value of attribute hdfs.



12
13
14
# File 'lib/swineherd/filesystem/hadoopfilesystem.rb', line 12

def hdfs
  @hdfs
end

Instance Method Details

#bzip(input, output) ⇒ Object

BZIP



123
124
125
126
127
128
129
130
131
132
133
# File 'lib/swineherd/filesystem/hadoopfilesystem.rb', line 123

def bzip input, output
  system("#{@hadoop_home}/bin/hadoop \\
   jar         #{@hadoop_home}/contrib/streaming/hadoop-*streaming*.jar     \\
   -D          mapred.output.compress=true                                  \\
   -D          mapred.output.compression.codec=org.apache.hadoop.io.compress.BZip2Codec  \\
   -D          mapred.reduce.tasks=1                                        \\
   -mapper     \"/bin/cat\"                                                 \\
   -reducer    \"/bin/cat\"                                                 \\
   -input      \"#{input}\"                                                 \\
   -output     \"#{output}\"")
end

#check_and_set_environmentObject

Make sure environment is sane then set up environment for use



28
29
30
31
# File 'lib/swineherd/filesystem/hadoopfilesystem.rb', line 28

def check_and_set_environment
  check_env
  set_env
end

#check_envObject

Check that we are running with jruby, check for hadoop home. hadoop_home is preferentially set to the HADOOP_HOME environment variable if it’s set, ‘/usr/local/share/hadoop’ if HADOOP_HOME isn’t defined, and ‘/usr/lib/hadoop’ if ‘/usr/local/share/hadoop’ doesn’t exist. If all else fails inform the user that HADOOP_HOME really should be set.



273
274
275
276
277
278
279
280
281
282
# File 'lib/swineherd/filesystem/hadoopfilesystem.rb', line 273

def check_env
  begin
    require 'java'
  rescue LoadError => e
    raise "\nJava not found, are you sure you're running with JRuby?\n" + e.message
  end
  @hadoop_home = (ENV['HADOOP_HOME'] || '/usr/local/share/hadoop')
  @hadoop_home = '/usr/lib/hadoop' unless File.exist? @hadoop_home
  raise "\nHadoop installation not found, try setting HADOOP_HOME\n" unless File.exist? @hadoop_home
end

#close(*args) ⇒ Object



176
177
178
# File 'lib/swineherd/filesystem/hadoopfilesystem.rb', line 176

def close *args
  @hdfs.close
end

#copy_from_local(srcfile, dstfile) ⇒ Object

Copy local file to hdfs filesystem



172
173
174
# File 'lib/swineherd/filesystem/hadoopfilesystem.rb', line 172

def copy_from_local srcfile, dstfile
  @hdfs.copy_from_local_file(Path.new(srcfile), Path.new(dstfile))
end

#copy_to_local(srcfile, dstfile) ⇒ Object

Copy hdfs file to local filesystem



165
166
167
# File 'lib/swineherd/filesystem/hadoopfilesystem.rb', line 165

def copy_to_local srcfile, dstfile
  @hdfs.copy_to_local_file(Path.new(srcfile), Path.new(dstfile))
end

#cp(srcpath, dstpath) ⇒ Object



66
67
68
# File 'lib/swineherd/filesystem/hadoopfilesystem.rb', line 66

def cp srcpath, dstpath
  FileUtil.copy(@hdfs, Path.new(srcpath), @hdfs, Path.new(dstpath), false, @conf)
end

#dist_merge(inputs, output, options = {}) ⇒ Object

Merges many input files into :reduce_tasks amount of output files



138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
# File 'lib/swineherd/filesystem/hadoopfilesystem.rb', line 138

def dist_merge inputs, output, options = {}
  options[:reduce_tasks]     ||= 25
  options[:partition_fields] ||= 2
  options[:sort_fields]      ||= 2
  options[:field_separator]  ||= '/t'
  names = inputs.map{|inp| File.basename(inp)}.join(',')
  cmd   = "#{@hadoop_home}/bin/hadoop \\
   jar         #{@hadoop_home}/contrib/streaming/hadoop-*streaming*.jar                   \\
   -D          mapred.job.name=\"Swineherd Merge (#{names} -> #{output})\"               \\
   -D          num.key.fields.for.partition=\"#{options[:partition_fields]}\"            \\
   -D          stream.num.map.output.key.fields=\"#{options[:sort_fields]}\"             \\
   -D          mapred.text.key.partitioner.options=\"-k1,#{options[:partition_fields]}\" \\
   -D          stream.map.output.field.separator=\"'#{options[:field_separator]}'\"      \\
   -D          mapred.min.split.size=1000000000                                          \\
   -D          mapred.reduce.tasks=#{options[:reduce_tasks]}                             \\
   -partitioner org.apache.hadoop.mapred.lib.KeyFieldBasedPartitioner                    \\
   -mapper     \"/bin/cat\"                                                              \\
   -reducer    \"/usr/bin/uniq\"                                                         \\
   -input      \"#{inputs.join(',')}\"                                                   \\
   -output     \"#{output}\""
  puts cmd
  system cmd
end

#entries(dirpath) ⇒ Object



90
91
92
93
94
# File 'lib/swineherd/filesystem/hadoopfilesystem.rb', line 90

def entries dirpath
  return unless type(dirpath) == "directory"
  list = @hdfs.list_status(Path.new(dirpath))
  list.map{|path| path.get_path.to_s} rescue []
end

#exists?(path) ⇒ Boolean

Returns:

  • (Boolean)


58
59
60
# File 'lib/swineherd/filesystem/hadoopfilesystem.rb', line 58

def exists? path
  @hdfs.exists(Path.new(path))
end

#lr(path) ⇒ Object

Recursively list paths



44
45
46
47
48
49
50
51
# File 'lib/swineherd/filesystem/hadoopfilesystem.rb', line 44

def lr path
  paths = entries(path)
  if (paths && !paths.empty?)
    paths.map{|e| lr(e)}.flatten
  else
    path
  end
end

#merge(srcdir, dstfile) ⇒ Object

Merge all part files in a directory into one file.



99
100
101
# File 'lib/swineherd/filesystem/hadoopfilesystem.rb', line 99

def merge srcdir, dstfile
  FileUtil.copy_merge(@hdfs, Path.new(srcdir), @hdfs, Path.new(dstfile), false, @conf, "")
end

#mkpath(path) ⇒ Object



70
71
72
73
# File 'lib/swineherd/filesystem/hadoopfilesystem.rb', line 70

def mkpath path
  @hdfs.mkdirs(Path.new(path))
  path
end

#mv(srcpath, dstpath) ⇒ Object



62
63
64
# File 'lib/swineherd/filesystem/hadoopfilesystem.rb', line 62

def mv srcpath, dstpath
  @hdfs.rename(Path.new(srcpath), Path.new(dstpath))
end

#open(path, mode = "r", &blk) ⇒ Object



33
34
35
# File 'lib/swineherd/filesystem/hadoopfilesystem.rb', line 33

def open path, mode="r", &blk
  HadoopFile.new(path,mode,self,&blk)
end

#rm(path) ⇒ Object



53
54
55
56
# File 'lib/swineherd/filesystem/hadoopfilesystem.rb', line 53

def rm path
  @hdfs.delete(Path.new(path), true)
  [path]
end

#set_envObject

Place hadoop jars in class path, require appropriate jars, set hadoop conf



287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
# File 'lib/swineherd/filesystem/hadoopfilesystem.rb', line 287

def set_env
  require 'java'
  @hadoop_conf = (ENV['HADOOP_CONF_DIR'] || File.join(@hadoop_home, 'conf'))
  @hadoop_conf += "/" unless @hadoop_conf.end_with? "/"
  $CLASSPATH << @hadoop_conf
  Dir["#{@hadoop_home}/hadoop*.jar", "#{@hadoop_home}/lib/*.jar"].each{|jar| require jar}

  java_import 'org.apache.hadoop.conf.Configuration'
  java_import 'org.apache.hadoop.fs.Path'
  java_import 'org.apache.hadoop.fs.FileSystem'
  java_import 'org.apache.hadoop.fs.FileUtil'
  java_import 'org.apache.hadoop.mapreduce.lib.input.FileInputFormat'
  java_import 'org.apache.hadoop.mapreduce.lib.output.FileOutputFormat'
  java_import 'org.apache.hadoop.fs.FSDataOutputStream'
  java_import 'org.apache.hadoop.fs.FSDataInputStream'

end

#size(path) ⇒ Object



37
38
39
# File 'lib/swineherd/filesystem/hadoopfilesystem.rb', line 37

def size path
  lr(path).inject(0){|sz, f| sz += @hdfs.get_file_status(Path.new(f)).get_len}
end

#stream(input, output) ⇒ Object

This is hackety. Use with caution.



106
107
108
109
110
111
112
113
114
115
116
117
118
# File 'lib/swineherd/filesystem/hadoopfilesystem.rb', line 106

def stream input, output
  require 'uri'
  input_fs_scheme  = URI.parse(input).scheme
  output_fs_scheme = URI.parse(output).scheme
  system("#{@hadoop_home}/bin/hadoop \\
   jar         #{@hadoop_home}/contrib/streaming/hadoop-*streaming*.jar                     \\
   -D          mapred.job.name=\"Stream { #{input_fs_scheme}(#{File.basename(input)}) -> #{output_fs_scheme}(#{File.basename(output)}) }\" \\
   -D          mapred.min.split.size=1000000000                                            \\
   -D          mapred.reduce.tasks=0                                                       \\
   -mapper     \"/bin/cat\"                                                                \\
   -input      \"#{input}\"                                                                \\
   -output     \"#{output}\"")
end

#type(path) ⇒ Object



75
76
77
78
79
80
81
82
83
84
85
86
87
88
# File 'lib/swineherd/filesystem/hadoopfilesystem.rb', line 75

def type path
  return "unknown" unless exists? path
  status = @hdfs.get_file_status(Path.new(path))
  return "directory" if status.is_dir?
  "file"
  # case
  # when status.isFile then
  #   return "file"
  # when status.is_directory? then
  #   return "directory"
  # when status.is_symlink? then
  #   return "symlink"
  # end
end