Class: Masamune::Filesystem
- Inherits:
-
Object
- Object
- Masamune::Filesystem
- Includes:
- Actions::HadoopFilesystem, Actions::S3Cmd, HasEnvironment
- Defined in:
- lib/masamune/filesystem.rb
Constant Summary collapse
- FILE_MODE =
0777 - File.umask
Instance Attribute Summary collapse
-
#paths ⇒ Object
readonly
Returns the value of attribute paths.
Instance Method Summary collapse
- #add_path(symbol, path, options = {}) ⇒ Object
- #basename(path) ⇒ Object
- #cat(*files) ⇒ Object
- #chown!(*files) ⇒ Object
- #clear! ⇒ Object
- #copy_dir(src, dst) ⇒ Object
- #copy_file_to_dir(src, dst) ⇒ Object
- #copy_file_to_file(src, dst) ⇒ Object
- #dirname(path) ⇒ Object
- #eval_path(path) ⇒ Object
- #exists?(file) ⇒ Boolean
- #expand_params(fs, path) ⇒ Object
- #get_path(symbol, *args) ⇒ Object (also: #path)
- #glob(pattern, options = {}) ⇒ Object
- #glob_sort(pattern, options = {}) ⇒ Object
- #glob_split(input, options = {}) ⇒ Object
- #glob_stat(pattern, options = {}) ⇒ Object
- #glob_to_regexp(input, options = {}) ⇒ Object
-
#initialize ⇒ Filesystem
constructor
A new instance of Filesystem.
- #mkdir!(*dirs) ⇒ Object
- #mktemp!(path) ⇒ Object
- #mktempdir!(path) ⇒ Object
- #move_dir(src, dst) ⇒ Object
- #move_file_to_dir(src, dst) ⇒ Object
- #move_file_to_file(src, dst) ⇒ Object
- #parent_paths(path) ⇒ Object
- #path?(symbol) ⇒ Boolean
- #relative_path?(path) ⇒ Boolean
- #remove_dir(dir) ⇒ Object
- #remove_file(file) ⇒ Object
- #resolve_file(paths = []) ⇒ Object
- #root_path?(path) ⇒ Boolean
- #stat(file_or_dir) ⇒ Object
- #touch!(*files) ⇒ Object
- #write(buf, dst) ⇒ Object
Methods included from Actions::HadoopFilesystem
Methods included from Actions::S3Cmd
Methods included from Commands::S3Cmd::ClassMethods
Methods included from HasEnvironment
Constructor Details
#initialize ⇒ Filesystem
Returns a new instance of Filesystem.
35 36 37 38 |
# File 'lib/masamune/filesystem.rb', line 35 def initialize @paths = {} @immutable_paths = {} end |
Instance Attribute Details
#paths ⇒ Object (readonly)
Returns the value of attribute paths.
76 77 78 |
# File 'lib/masamune/filesystem.rb', line 76 def paths @paths end |
Instance Method Details
#add_path(symbol, path, options = {}) ⇒ Object
44 45 46 47 48 49 50 51 52 |
# File 'lib/masamune/filesystem.rb', line 44 def add_path(symbol, path, = {}) ||= {} .symbolize_keys! eager_path = eager_load_path path @paths[symbol.to_sym] = [eager_path, ] mkdir!(eager_path) if [:mkdir] add_immutable_path(eager_path) if [:immutable] self end |
#basename(path) ⇒ Object
134 135 136 137 138 139 |
# File 'lib/masamune/filesystem.rb', line 134 def basename(path) return unless path node = remote_prefix(path) ? path.split(remote_prefix(path)).last : path return if node.nil? || node.blank? node.split('/').last end |
#cat(*files) ⇒ Object
377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 |
# File 'lib/masamune/filesystem.rb', line 377 def cat(*files) StringIO.new.tap do |buf| files.group_by { |path| type(path) }.each do |type, file_set| case type when :local file_set.map do |file| next unless File.exist?(file) next if File.directory?(file) buf << File.read(file) end end end buf.rewind end end |
#chown!(*files) ⇒ Object
403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 |
# File 'lib/masamune/filesystem.rb', line 403 def chown!(*files) opts = files.last.is_a?(Hash) ? files.pop : {} user = opts.fetch(:user, current_user) group = opts.fetch(:group, current_group) files.group_by { |path| type(path) }.each do |type, file_set| case type when :hdfs hadoop_fs('-chown', '-R', [user, group].compact.join(':'), *file_set) when :s3 nil # NOTE intentionally skip when :local FileUtils.chown_R(user, group, file_set, file_util_args) end end end |
#clear! ⇒ Object
40 41 42 |
# File 'lib/masamune/filesystem.rb', line 40 def clear! # Intentionally unimplemented end |
#copy_dir(src, dst) ⇒ Object
283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 |
# File 'lib/masamune/filesystem.rb', line 283 def copy_dir(src, dst) check_immutable_path!(dst) case [type(src), type(dst)] when %i[hdfs hdfs] copy_file_to_dir(src, dst) when %i[hdfs local] copy_file_to_dir(src, dst) when %i[hdfs s3] copy_file_to_dir(src, dst) when %i[s3 s3] s3cmd('cp', '--recursive', s3b(src, dir: true), s3b(dst, dir: true)) when %i[s3 local] fixed_dst = File.join(dst, src.split('/')[-1]) FileUtils.mkdir_p(fixed_dst, file_util_args) s3cmd('get', '--recursive', '--skip-existing', s3b(src, dir: true), fixed_dst) when %i[s3 hdfs] copy_file_to_dir(src, dst) when %i[local local] FileUtils.mkdir_p(dst, file_util_args) FileUtils.cp_r(src, dst, file_util_args) when %i[local hdfs] copy_file_to_dir(src, dst) when %i[local s3] s3cmd('put', '--recursive', src, s3b(dst, dir: true)) end end |
#copy_file_to_dir(src, dst) ⇒ Object
276 277 278 279 280 281 |
# File 'lib/masamune/filesystem.rb', line 276 def copy_file_to_dir(src, dst) check_immutable_path!(dst) mkdir!(dst) unless type(dst) == :s3 return false if dirname(src) == dst copy_file_helper(src, dst, true) end |
#copy_file_to_file(src, dst) ⇒ Object
270 271 272 273 274 |
# File 'lib/masamune/filesystem.rb', line 270 def copy_file_to_file(src, dst) check_immutable_path!(dst) mkdir!(dirname(dst)) unless type(dst) == :s3 copy_file_helper(src, dst, false) end |
#dirname(path) ⇒ Object
130 131 132 |
# File 'lib/masamune/filesystem.rb', line 130 def dirname(path) parent_paths(path).last || remote_prefix(path) || local_prefix(path) end |
#eval_path(path) ⇒ Object
78 79 80 |
# File 'lib/masamune/filesystem.rb', line 78 def eval_path(path) path.respond_to?(:call) ? path.call(self) : path end |
#exists?(file) ⇒ Boolean
160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 |
# File 'lib/masamune/filesystem.rb', line 160 def exists?(file) case type(file) when :hdfs hadoop_fs('-test', '-e', file, safe: true).success? when :s3 result = Set.new s3cmd('ls', s3b(file), safe: true) do |line| _date, _time, _size, name = line.split(/\s+/) result << (name == file) end result.any? when :local File.exist?(file) end end |
#expand_params(fs, path) ⇒ Object
82 83 84 85 86 87 88 |
# File 'lib/masamune/filesystem.rb', line 82 def (fs, path) new_path = path.dup fs.environment.configuration.params.each do |key, value| new_path.gsub!("%#{key}", value.to_s) end new_path end |
#get_path(symbol, *args) ⇒ Object Also known as: path
54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 |
# File 'lib/masamune/filesystem.rb', line 54 def get_path(symbol, *args) = args.last.is_a?(Hash) ? args.pop : {} lazy_path = lambda do |fs| fs.path?(symbol) || raise("Path :#{symbol} not defined") path, = fs.paths[symbol] mkdir!(path) if [:mkdir] (fs, args.any? ? File.join(path, args) : path) end if eager_load_paths? eager_load_path lazy_path.call(self) else lazy_path end end |
#glob(pattern, options = {}) ⇒ Object
228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 |
# File 'lib/masamune/filesystem.rb', line 228 def glob(pattern, = {}) return Set.new(to_enum(:glob, pattern, )) unless block_given? case type(pattern) when :hdfs _file_glob, file_regexp = glob_split(pattern) hadoop_fs('-ls', pattern, safe: true) do |line| next if line =~ /\AFound \d+ items/ name = line.split(/\s+/).last next unless name prefixed_name = remote_prefix(pattern) + name next unless prefixed_name && prefixed_name =~ file_regexp yield q(pattern, prefixed_name) end when :s3 file_glob, file_regexp = glob_split(pattern) s3cmd('ls', '--recursive', s3b(file_glob), safe: true) do |line| next if line =~ /\$folder$/ name = line.split(/\s+/).last next unless name && name =~ file_regexp yield q(pattern, name) end when :local Dir.glob(pattern.gsub(%r{/\*(\.\w+)?\Z}, '/**/*\1')) do |file| yield file end end end |
#glob_sort(pattern, options = {}) ⇒ Object
256 257 258 259 260 261 262 263 264 265 266 267 268 |
# File 'lib/masamune/filesystem.rb', line 256 def glob_sort(pattern, = {}) return to_enum(:glob_sort, pattern, ).to_a unless block_given? case [:order] when :basename glob(pattern).sort { |x, y| File.basename(x) <=> File.basename(y) }.each do |result| yield result end else glob(pattern) do |result| yield result end end end |
#glob_split(input, options = {}) ⇒ Object
432 433 434 |
# File 'lib/masamune/filesystem.rb', line 432 def glob_split(input, = {}) [input.include?('*') ? input.split('*').first + '*' : input, glob_to_regexp(input, )] end |
#glob_stat(pattern, options = {}) ⇒ Object
176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 |
# File 'lib/masamune/filesystem.rb', line 176 def glob_stat(pattern, = {}) return Set.new(to_enum(:glob_stat, pattern, )) unless block_given? case type(pattern) when :hdfs hadoop_fs('-ls', '-R', pattern, safe: true) do |line| next if line =~ /\AFound \d+ items/ size, date, time, name = line.split(/\s+/).last(4) next unless size && date && time && name prefixed_name = remote_prefix(pattern) + name yield OpenStruct.new(name: prefixed_name, mtime: Time.parse("#{date} #{time} +0000").at_beginning_of_minute.utc, size: size.to_i) end when :s3 file_glob, file_regexp = glob_split(pattern, recursive: true) s3cmd('ls', '--recursive', s3b(file_glob), safe: true) do |line| next if line =~ /\$folder$/ date, time, size, name = line.split(/\s+/) next unless size && date && time && name next unless name =~ file_regexp yield OpenStruct.new(name: name, mtime: Time.parse("#{date} #{time} +0000").at_beginning_of_minute.utc, size: size.to_i) end when :local Dir.glob(pattern.gsub(%r{/\*(\.\w+)?\Z}, '/**/*\1')) do |file| stat = File.stat(file) yield OpenStruct.new(name: file, mtime: stat.mtime.at_beginning_of_minute.utc, size: stat.size.to_i) end end end |
#glob_to_regexp(input, options = {}) ⇒ Object
436 437 438 439 440 441 442 |
# File 'lib/masamune/filesystem.rb', line 436 def glob_to_regexp(input, = {}) if input.include?('*') || .fetch(:recursive, false) /\A#{Regexp.escape(input).gsub('\\*', '.*?').gsub(%r{\/\.\*\?\z}, '/?.*?')}/ else /\A#{Regexp.escape(input)}\z/ end end |
#mkdir!(*dirs) ⇒ Object
213 214 215 216 217 218 219 220 221 222 223 224 225 226 |
# File 'lib/masamune/filesystem.rb', line 213 def mkdir!(*dirs) dirs.uniq! dirs.group_by { |path| type(path) }.each do |type, dir_set| case type when :hdfs hadoop_fs('-mkdir', '-p', *dir_set) when :s3 touch!(*dir_set.map { |dir| File.join(dir, '.not_empty') }) when :local missing_dir_set = dir_set.reject { |dir| File.exist?(dir) } FileUtils.mkdir_p(missing_dir_set, file_util_args) if missing_dir_set.any? end end end |
#mktemp!(path) ⇒ Object
420 421 422 423 424 |
# File 'lib/masamune/filesystem.rb', line 420 def mktemp!(path) get_path(path, SecureRandom.urlsafe_base64).tap do |file| touch!(file) end end |
#mktempdir!(path) ⇒ Object
426 427 428 429 430 |
# File 'lib/masamune/filesystem.rb', line 426 def mktempdir!(path) get_path(path, SecureRandom.urlsafe_base64).tap do |dir| mkdir!(dir) end end |
#move_dir(src, dst) ⇒ Object
348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 |
# File 'lib/masamune/filesystem.rb', line 348 def move_dir(src, dst) check_immutable_path!(src) case [type(src), type(dst)] when %i[hdfs hdfs] move_file_to_file(src, dst) when %i[hdfs local] copy_file_to_dir(src, dst) remove_dir(src) when %i[s3 s3] s3cmd('mv', '--recursive', d(src), f(dst)) when %i[s3 local] s3cmd('get', '--recursive', d(src), f(dst)) remove_dir(src) when %i[s3 hdfs] copy_file_to_dir(src, dst) remove_dir(src) when %i[hdfs s3] copy_file_to_dir(src, d(dst)) remove_dir(src) when %i[local local] move_file_to_file(src, dst) when %i[local hdfs] move_file_to_file(src, dst) when %i[local s3] s3cmd('put', '--recursive', d(src), d(dst)) remove_dir(src) end end |
#move_file_to_dir(src, dst) ⇒ Object
342 343 344 345 346 |
# File 'lib/masamune/filesystem.rb', line 342 def move_file_to_dir(src, dst) check_immutable_path!(src) mkdir!(dst) unless type(dst) == :s3 move_file_helper(src, dst, true) end |
#move_file_to_file(src, dst) ⇒ Object
336 337 338 339 340 |
# File 'lib/masamune/filesystem.rb', line 336 def move_file_to_file(src, dst) check_immutable_path!(src) mkdir!(dirname(dst)) unless type(dst) == :s3 move_file_helper(src, dst, false) end |
#parent_paths(path) ⇒ Object
95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 |
# File 'lib/masamune/filesystem.rb', line 95 def parent_paths(path) prefix = remote_prefix(path) if prefix node = path.split(prefix).last else prefix = '' node = path end return [] if prefix.blank? && node.blank? parent_paths = node ? File.(node, '/').split('/') : [] parent_paths.reject!(&:blank?) parent_paths.prepend('/') if node =~ %r{\A/} tmp = [] result = [] parent_paths.each do |part| tmp << part current_path = prefix + File.join(tmp) result << current_path end result.pop result end |
#path?(symbol) ⇒ Boolean
72 73 74 |
# File 'lib/masamune/filesystem.rb', line 72 def path?(symbol) @paths.key?(symbol) end |
#relative_path?(path) ⇒ Boolean
90 91 92 93 |
# File 'lib/masamune/filesystem.rb', line 90 def relative_path?(path) return false if remote_prefix(path) path[0] != '/' end |
#remove_dir(dir) ⇒ Object
322 323 324 325 326 327 328 329 330 331 332 333 334 |
# File 'lib/masamune/filesystem.rb', line 322 def remove_dir(dir) raise "#{dir} is root path, cannot remove" if root_path?(dir) check_immutable_path!(dir) case type(dir) when :hdfs hadoop_fs('-rm', '-r', dir) when :s3 s3cmd('del', '--recursive', s3b(dir, dir: true)) s3cmd('del', '--recursive', s3b("#{dir}_$folder$")) when :local FileUtils.rmtree(dir, file_util_args) end end |
#remove_file(file) ⇒ Object
310 311 312 313 314 315 316 317 318 319 320 |
# File 'lib/masamune/filesystem.rb', line 310 def remove_file(file) check_immutable_path!(file) case type(file) when :hdfs hadoop_fs('-rm', file) when :s3 s3cmd('del', s3b(file, dir: false)) when :local FileUtils.rm(file, file_util_args) end end |
#resolve_file(paths = []) ⇒ Object
126 127 128 |
# File 'lib/masamune/filesystem.rb', line 126 def resolve_file(paths = []) Array.wrap(paths).select { |path| File.exist?(path) && File.file?(path) }.first end |
#root_path?(path) ⇒ Boolean
119 120 121 122 123 124 |
# File 'lib/masamune/filesystem.rb', line 119 def root_path?(path) raise ArgumentError, 'path cannot be nil' if path.nil? raise ArgumentError, 'path cannot be blank' if path.blank? raise ArgumentError, 'path cannot be relative' if relative_path?(path) parent_paths(path).count < 1 end |
#stat(file_or_dir) ⇒ Object
204 205 206 207 208 209 210 211 |
# File 'lib/masamune/filesystem.rb', line 204 def stat(file_or_dir) raise ArgumentError, 'cannot contain wildcard' if file_or_dir.include?('*') result = glob_stat(file_or_dir) return unless result.any? max_time = result.map { |stat| stat.try(:mtime) }.compact.max sum_size = result.map { |stat| stat.try(:size) }.compact.reduce(:+) OpenStruct.new(name: file_or_dir, mtime: max_time, size: sum_size) end |
#touch!(*files) ⇒ Object
141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 |
# File 'lib/masamune/filesystem.rb', line 141 def touch!(*files) files.uniq! files.group_by { |path| type(path) }.each do |type, file_set| mkdir!(*file_set.map { |file| File.dirname(file) }) unless type == :s3 case type when :hdfs hadoop_fs('-touchz', *file_set) when :s3 empty = Tempfile.new('masamune_empty') file_set.each do |file| s3cmd('put', empty.path, s3b(file, dir: false)) end when :local FileUtils.touch(file_set, file_util_args) FileUtils.chmod(FILE_MODE, file_set, file_util_args) end end end |
#write(buf, dst) ⇒ Object
393 394 395 396 397 398 399 400 401 |
# File 'lib/masamune/filesystem.rb', line 393 def write(buf, dst) case type(dst) when :local mkdir!(File.dirname(dst)) File.open(dst, 'w') do |file| file.write buf end end end |