Module: WebHDFS::FileUtils
- Defined in:
- lib/webhdfs/fileutils.rb
Class Method Summary collapse
-
.append(path, body, options = {}) ⇒ Object
Public: Append to HDFS file.
-
.chmod(mode, list, options = {}) ⇒ Object
Public: Change permission of one or more directories/files.
-
.chown(user, group, list, options = {}) ⇒ Object
Public: Change an ownership of one or more directories/files.
-
.copy_from_local(file, path, options = {}) ⇒ Object
Public: Copy local file into HDFS.
-
.copy_from_local_via_stream(file, path, options = {}) ⇒ Object
Public: Copy local file into HDFS with IOStream.
-
.copy_to_local(path, file, options = {}) ⇒ Object
Public: Copy remote HDFS file into local.
-
.mkdir(list, options = {}) ⇒ Object
Public: Create one or more directories.
-
.mkdir_p ⇒ Object
Public: Create one or more directories.
-
.private_module_function(name) ⇒ Object
Internal: make functin private.
-
.rename(src, dst, options = {}) ⇒ Object
Public: Rename a file or directory.
-
.rm(list, options = {}) ⇒ Object
Public: Remove one or more directories or files.
-
.rmr(list, options = {}) ⇒ Object
Public: Remove one or more directories/files recursively.
-
.set_atime(list, time, options = {}) ⇒ Object
Public: Set an access time of files.
-
.set_httpfs_mode(mode = true) ⇒ Object
Public: Set httpfs mode enable/disable.
-
.set_kerberos(mode = true) ⇒ Object
Public: Set kerberos authentication enable/disable.
-
.set_mtime(list, time, options = {}) ⇒ Object
Public: Set a modification time of files.
-
.set_repl_factor(list, num, options = {}) ⇒ Object
Public: Set a replication factor of files.
-
.set_server(host, port, user = nil, doas = nil, proxy_address = nil, proxy_port = nil) ⇒ Object
Public: Set hostname and port number of WebHDFS.
-
.set_ssl(mode = true) ⇒ Object
Public: Set ssl enable/disable.
-
.set_ssl_ca_file(ca_file) ⇒ Object
Public: Set ssl ca_file.
-
.set_ssl_verify_mode(mode) ⇒ Object
Public: Set ssl verify mode.
Instance Method Summary collapse
-
#client ⇒ Object
Internal.
-
#fu_log(msg) ⇒ Object
Internal: Logging.
Class Method Details
.append(path, body, options = {}) ⇒ Object
Public: Append to HDFS file
path - HDFS file path body - contents options - :buffersize, :verbose
Examples
FileUtils.append 'remote_path', 'contents'
188 189 190 191 192 |
# File 'lib/webhdfs/fileutils.rb', line 188 def append(path, body, = {}) opts = .dup fu_log "append #{body.bytesize} bytes to #{path}" if opts.delete(:verbose) client.append(path, body, opts) end |
.chmod(mode, list, options = {}) ⇒ Object
304 305 306 307 308 309 310 311 312 313 314 |
# File 'lib/webhdfs/fileutils.rb', line 304 def chmod(mode, list, = {}) opts = .dup list = [list].flatten fu_log format('chmod %o %s', mode, list.join(' ')) if opts.delete(:verbose) mode = format('%03o', mode) if mode.is_a? Integer c = client list.each do |entry| c.chmod(entry, mode, opts) end end |
.chown(user, group, list, options = {}) ⇒ Object
329 330 331 332 333 334 335 336 337 338 339 |
# File 'lib/webhdfs/fileutils.rb', line 329 def chown(user, group, list, = {}) opts = .dup list = [list].flatten fu_log format('chown %s%s', [user, group].compact.join(':') + ' ', list.join(' ')) if opts.delete(:verbose) c = client list.each do |entry| c.chown(entry, owner: user, group: group) end end |
.copy_from_local(file, path, options = {}) ⇒ Object
Public: Copy local file into HDFS
file - local file path path - HDFS file path options :overwrite, :blocksize, :replication, :mode, :buffersize, :verbose
Examples
FileUtils.copy_from_local 'local_file', 'remote_file'
115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 |
# File 'lib/webhdfs/fileutils.rb', line 115 def copy_from_local(file, path, = {}) opts = .dup fu_log "copy_from_local local=#{file} " \ "hdfs=#{path}" if opts.delete(:verbose) mode = opts.delete(:mode) if mode mode = format('%03o', mode) if mode.is_a? Integer else mode = '644' end opts[:permission] = mode opts[:overwrite] ||= true client.create(path, File.new(file, 'rb').read(File.size(file)), opts) end |
.copy_from_local_via_stream(file, path, options = {}) ⇒ Object
Public: Copy local file into HDFS with IOStream
file - local file IO handle path - HDFS file path options :overwrite, :blocksize, :replication, :mode, :buffersize, :verbose
Examples
FileUtils.copy_from_local_via_stream ‘local_file_IO_handle’, ‘remote_file’
142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 |
# File 'lib/webhdfs/fileutils.rb', line 142 def copy_from_local_via_stream(file, path, = {}) opts = .dup fu_log "copy_from_local_via_stream local=#{file} " \ "hdfs=#{path}" if opts.delete(:verbose) mode = opts.delete(:mode) if mode mode = format('%03o', mode) if mode.is_a? Integer else mode = '644' end opts[:permission] = mode opts[:overwrite] ||= true client.create(path, File.new(file, 'rb'), opts) end |
.copy_to_local(path, file, options = {}) ⇒ Object
Public: Copy remote HDFS file into local
path - HDFS file path file - local file path options - :offset, :length, :buffersize, :verbose
Examples
FileUtils.copy_to_local 'remote_file', 'local_file'
169 170 171 172 173 174 175 |
# File 'lib/webhdfs/fileutils.rb', line 169 def copy_to_local(path, file, = {}) opts = .dup fu_log "copy_to_local hdfs=#{path} local=#{file}" if opts.delete(:verbose) File.open(file, 'wb') do |f| f.write client.read(path, opts) end end |
.mkdir(list, options = {}) ⇒ Object
206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 |
# File 'lib/webhdfs/fileutils.rb', line 206 def mkdir(list, = {}) opts = .dup list = [list].flatten fu_log "mkdir #{format('-m %03o ', [:mode]) if [:mode]}" \ "#{list.join ' '}" if opts.delete(:verbose) mode = opts[:mode] if mode mode = format('0%03o', mode) if mode.is_a? Integer else mode = '0755' end c = client list.each do |dir| c.mkdir(dir, permission: mode) end end |
.mkdir_p ⇒ Object
Public: Create one or more directories.
list - directory name, or list of them options - :mode, :verbose
Examples
FileUtils.mkdir 'test'
FileUtils.mkdir %w( tmp data )
FileUtils.mkdir 'tmp', :mode => 0700
Public: Create one or more directories recursively.
list - directory name, or list of them options - :mode, :verbose
Examples
FileUtils.mkdir_p 'dir/subdir'
FileUtils.mkdir_p %w( tmp data )
FileUtils.mkdir_p 'dir/subdir', :mode => 0700
235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 |
# File 'lib/webhdfs/fileutils.rb', line 235 def mkdir(list, = {}) opts = .dup list = [list].flatten fu_log "mkdir #{format('-m %03o ', [:mode]) if [:mode]}" \ "#{list.join ' '}" if opts.delete(:verbose) mode = opts[:mode] if mode mode = format('0%03o', mode) if mode.is_a? Integer else mode = '0755' end c = client list.each do |dir| c.mkdir(dir, permission: mode) end end |
.private_module_function(name) ⇒ Object
Internal: make functin private
411 412 413 414 |
# File 'lib/webhdfs/fileutils.rb', line 411 def self.private_module_function(name) module_function name private_class_method name end |
.rename(src, dst, options = {}) ⇒ Object
Public: Rename a file or directory.
src - from dst - to options - :verbose
Examples
FileUtils.rename 'from', 'to'
286 287 288 289 290 |
# File 'lib/webhdfs/fileutils.rb', line 286 def rename(src, dst, = {}) opts = .dup fu_log "rename #{src} #{dst}" if opts.delete(:verbose) client.rename(src, dst, opts) end |
.rm(list, options = {}) ⇒ Object
249 250 251 252 253 254 255 256 257 |
# File 'lib/webhdfs/fileutils.rb', line 249 def rm(list, = {}) opts = .dup list = [list].flatten fu_log "rm #{list.join ' '}" if opts.delete(:verbose) c = client list.each do |dir| c.delete(dir, recursive: opts[:recursive] || false) end end |
.rmr(list, options = {}) ⇒ Object
271 272 273 |
# File 'lib/webhdfs/fileutils.rb', line 271 def rmr(list, = {}) rm(list, .merge(recursive: true)) end |
.set_atime(list, time, options = {}) ⇒ Object
Public: Set an access time of files
list - file/directory name or list of them time - new access time options - :verbose
Examples
FileUtils.set_atime 'file', Time.now
374 375 376 377 378 379 380 381 382 383 384 |
# File 'lib/webhdfs/fileutils.rb', line 374 def set_atime(list, time, = {}) opts = .dup list = [list].flatten time = time.to_i fu_log format('set_atime %s %d', list.join(' '), time) if opts.delete(:verbose) c = client list.each do |entry| c.touch(entry, accesstime: time) end end |
.set_httpfs_mode(mode = true) ⇒ Object
Public: Set httpfs mode enable/disable
mode - boolean (default true)
Examples
FileUtils.set_httpfs_mode
48 49 50 |
# File 'lib/webhdfs/fileutils.rb', line 48 def set_httpfs_mode(mode = true) @fu_httpfs_mode = mode end |
.set_kerberos(mode = true) ⇒ Object
Public: Set kerberos authentication enable/disable
mode - boolean (default true)
Examples
FileUtils.set_kerberos
100 101 102 |
# File 'lib/webhdfs/fileutils.rb', line 100 def set_kerberos(mode = true) @fu_kerberos = mode end |
.set_mtime(list, time, options = {}) ⇒ Object
Public: Set a modification time of files
list - file/directory name or list of them time - new modification time options - :verbose
Examples
FileUtils.set_mtime 'file', Time.now
397 398 399 400 401 402 403 404 405 406 407 |
# File 'lib/webhdfs/fileutils.rb', line 397 def set_mtime(list, time, = {}) opts = .dup list = [list].flatten time = time.to_i fu_log format('set_mtime %s %d', list.join(' '), time) if opts.delete(:verbose) c = client list.each do |entry| c.touch(entry, modificationtime: time) end end |
.set_repl_factor(list, num, options = {}) ⇒ Object
Public: Set a replication factor of files
list - file/directory name or list of them num - replication factor options - :verbose
Examples
FileUtils.set_repl_factor 'file', 3
352 353 354 355 356 357 358 359 360 361 |
# File 'lib/webhdfs/fileutils.rb', line 352 def set_repl_factor(list, num, = {}) opts = .dup list = [list].flatten fu_log format('set_repl_factor %s %d', list.join(' '), num) if opts.delete(:verbose) c = client list.each do |entry| c.replication(entry, num, opts) end end |
.set_server(host, port, user = nil, doas = nil, proxy_address = nil, proxy_port = nil) ⇒ Object
Public: Set hostname and port number of WebHDFS
host - hostname port - port user - username doas - proxy user name proxy_address - address of the net http proxy to use proxy_port - port of the net http proxy to use
Examples
FileUtils.set_server 'localhost', 50070
29 30 31 32 33 34 35 36 37 |
# File 'lib/webhdfs/fileutils.rb', line 29 def set_server(host, port, user = nil, doas = nil, proxy_address = nil, proxy_port = nil) @fu_host = host @fu_port = port @fu_user = user @fu_doas = doas @fu_paddr = proxy_address @fu_pport = proxy_port end |
.set_ssl(mode = true) ⇒ Object
61 62 63 |
# File 'lib/webhdfs/fileutils.rb', line 61 def set_ssl(mode = true) @fu_ssl = mode end |
.set_ssl_ca_file(ca_file) ⇒ Object
74 75 76 |
# File 'lib/webhdfs/fileutils.rb', line 74 def set_ssl_ca_file(ca_file) @fu_ssl_ca_file = ca_file end |
.set_ssl_verify_mode(mode) ⇒ Object
87 88 89 |
# File 'lib/webhdfs/fileutils.rb', line 87 def set_ssl_verify_mode(mode) @fu_ssl_verify_mode = mode end |
Instance Method Details
#client ⇒ Object
Internal
427 428 429 430 431 432 433 434 435 436 |
# File 'lib/webhdfs/fileutils.rb', line 427 def client client = WebHDFS::Client.new(@fu_host, @fu_port, @fu_user, @fu_doas, @fu_paddr, @fu_pport) client.httpfs_mode = true if @fu_httpfs_mode client.ssl = true if @fu_ssl client.ssl_ca_file = @fu_ssl_ca_file if @fu_ssl_ca_file client.ssl_verify_mode = @fu_ssl_verify_mode if @fu_ssl_verify_mode client.kerberos = true if @fu_kerberos client end |
#fu_log(msg) ⇒ Object
Internal: Logging
419 420 421 422 423 |
# File 'lib/webhdfs/fileutils.rb', line 419 def fu_log(msg) @fileutils_output ||= $stderr @fileutils_label ||= '' @fileutils_output.puts @fileutils_label + msg end |