Class: Masamune::Filesystem

Inherits:
Object
  • Object
show all
Includes:
Actions::HadoopFilesystem, Actions::S3Cmd, HasEnvironment
Defined in:
lib/masamune/filesystem.rb

Constant Summary collapse

FILE_MODE =
0777 - File.umask

Instance Attribute Summary collapse

Instance Method Summary collapse

Methods included from Actions::HadoopFilesystem

#hadoop_filesystem

Methods included from Actions::S3Cmd

#s3_sync, #s3cmd

Methods included from Commands::S3Cmd::ClassMethods

#s3b, #s3n

Methods included from HasEnvironment

#environment, #environment=

Constructor Details

#initializeFilesystem

Returns a new instance of Filesystem.



35
36
37
38
# File 'lib/masamune/filesystem.rb', line 35

def initialize
  @paths = {}
  @immutable_paths = {}
end

Instance Attribute Details

#pathsObject (readonly)

Returns the value of attribute paths.



76
77
78
# File 'lib/masamune/filesystem.rb', line 76

def paths
  @paths
end

Instance Method Details

#add_path(symbol, path, options = {}) ⇒ Object



44
45
46
47
48
49
50
51
52
# File 'lib/masamune/filesystem.rb', line 44

def add_path(symbol, path, options = {})
  options ||= {}
  options.symbolize_keys!
  eager_path = eager_load_path path
  @paths[symbol.to_sym] = [eager_path, options]
  mkdir!(eager_path) if options[:mkdir]
  add_immutable_path(eager_path) if options[:immutable]
  self
end

#basename(path) ⇒ Object



134
135
136
137
138
139
# File 'lib/masamune/filesystem.rb', line 134

def basename(path)
  return unless path
  node = remote_prefix(path) ? path.split(remote_prefix(path)).last : path
  return if node.nil? || node.blank?
  node.split('/').last
end

#cat(*files) ⇒ Object



377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
# File 'lib/masamune/filesystem.rb', line 377

def cat(*files)
  StringIO.new.tap do |buf|
    files.group_by { |path| type(path) }.each do |type, file_set|
      case type
      when :local
        file_set.map do |file|
          next unless File.exist?(file)
          next if File.directory?(file)
          buf << File.read(file)
        end
      end
    end
    buf.rewind
  end
end

#chown!(*files) ⇒ Object



403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
# File 'lib/masamune/filesystem.rb', line 403

def chown!(*files)
  opts = files.last.is_a?(Hash) ? files.pop : {}
  user = opts.fetch(:user, current_user)
  group = opts.fetch(:group, current_group)

  files.group_by { |path| type(path) }.each do |type, file_set|
    case type
    when :hdfs
      hadoop_fs('-chown', '-R', [user, group].compact.join(':'), *file_set)
    when :s3
      nil # NOTE intentionally skip
    when :local
      FileUtils.chown_R(user, group, file_set, file_util_args)
    end
  end
end

#clear!Object



40
41
42
# File 'lib/masamune/filesystem.rb', line 40

def clear!
  # Intentionally unimplemented
end

#copy_dir(src, dst) ⇒ Object



283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
# File 'lib/masamune/filesystem.rb', line 283

def copy_dir(src, dst)
  check_immutable_path!(dst)
  case [type(src), type(dst)]
  when %i[hdfs hdfs]
    copy_file_to_dir(src, dst)
  when %i[hdfs local]
    copy_file_to_dir(src, dst)
  when %i[hdfs s3]
    copy_file_to_dir(src, dst)
  when %i[s3 s3]
    s3cmd('cp', '--recursive', s3b(src, dir: true), s3b(dst, dir: true))
  when %i[s3 local]
    fixed_dst = File.join(dst, src.split('/')[-1])
    FileUtils.mkdir_p(fixed_dst, file_util_args)
    s3cmd('get', '--recursive', '--skip-existing', s3b(src, dir: true), fixed_dst)
  when %i[s3 hdfs]
    copy_file_to_dir(src, dst)
  when %i[local local]
    FileUtils.mkdir_p(dst, file_util_args)
    FileUtils.cp_r(src, dst, file_util_args)
  when %i[local hdfs]
    copy_file_to_dir(src, dst)
  when %i[local s3]
    s3cmd('put', '--recursive', src, s3b(dst, dir: true))
  end
end

#copy_file_to_dir(src, dst) ⇒ Object



276
277
278
279
280
281
# File 'lib/masamune/filesystem.rb', line 276

def copy_file_to_dir(src, dst)
  check_immutable_path!(dst)
  mkdir!(dst) unless type(dst) == :s3
  return false if dirname(src) == dst
  copy_file_helper(src, dst, true)
end

#copy_file_to_file(src, dst) ⇒ Object



270
271
272
273
274
# File 'lib/masamune/filesystem.rb', line 270

def copy_file_to_file(src, dst)
  check_immutable_path!(dst)
  mkdir!(dirname(dst)) unless type(dst) == :s3
  copy_file_helper(src, dst, false)
end

#dirname(path) ⇒ Object



130
131
132
# File 'lib/masamune/filesystem.rb', line 130

def dirname(path)
  parent_paths(path).last || remote_prefix(path) || local_prefix(path)
end

#eval_path(path) ⇒ Object



78
79
80
# File 'lib/masamune/filesystem.rb', line 78

def eval_path(path)
  path.respond_to?(:call) ? path.call(self) : path
end

#exists?(file) ⇒ Boolean

Returns:

  • (Boolean)


160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
# File 'lib/masamune/filesystem.rb', line 160

def exists?(file)
  case type(file)
  when :hdfs
    hadoop_fs('-test', '-e', file, safe: true).success?
  when :s3
    result = Set.new
    s3cmd('ls', s3b(file), safe: true) do |line|
      _date, _time, _size, name = line.split(/\s+/)
      result << (name == file)
    end
    result.any?
  when :local
    File.exist?(file)
  end
end

#expand_params(fs, path) ⇒ Object



82
83
84
85
86
87
88
# File 'lib/masamune/filesystem.rb', line 82

def expand_params(fs, path)
  new_path = path.dup
  fs.environment.configuration.params.each do |key, value|
    new_path.gsub!("%#{key}", value.to_s)
  end
  new_path
end

#get_path(symbol, *args) ⇒ Object Also known as: path



54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
# File 'lib/masamune/filesystem.rb', line 54

def get_path(symbol, *args)
  options = args.last.is_a?(Hash) ? args.pop : {}
  lazy_path = lambda do |fs|
    fs.path?(symbol) || raise("Path :#{symbol} not defined")
    path, options = fs.paths[symbol]

    mkdir!(path) if options[:mkdir]
    expand_params(fs, args.any? ? File.join(path, args) : path)
  end

  if eager_load_paths?
    eager_load_path lazy_path.call(self)
  else
    lazy_path
  end
end

#glob(pattern, options = {}) ⇒ Object



228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
# File 'lib/masamune/filesystem.rb', line 228

def glob(pattern, options = {})
  return Set.new(to_enum(:glob, pattern, options)) unless block_given?
  case type(pattern)
  when :hdfs
    _file_glob, file_regexp = glob_split(pattern)
    hadoop_fs('-ls', pattern, safe: true) do |line|
      next if line =~ /\AFound \d+ items/
      name = line.split(/\s+/).last
      next unless name
      prefixed_name = remote_prefix(pattern) + name
      next unless prefixed_name && prefixed_name =~ file_regexp
      yield q(pattern, prefixed_name)
    end
  when :s3
    file_glob, file_regexp = glob_split(pattern)
    s3cmd('ls', '--recursive', s3b(file_glob), safe: true) do |line|
      next if line =~ /\$folder$/
      name = line.split(/\s+/).last
      next unless name && name =~ file_regexp
      yield q(pattern, name)
    end
  when :local
    Dir.glob(pattern.gsub(%r{/\*(\.\w+)?\Z}, '/**/*\1')) do |file|
      yield file
    end
  end
end

#glob_sort(pattern, options = {}) ⇒ Object



256
257
258
259
260
261
262
263
264
265
266
267
268
# File 'lib/masamune/filesystem.rb', line 256

def glob_sort(pattern, options = {})
  return to_enum(:glob_sort, pattern, options).to_a unless block_given?
  case options[:order]
  when :basename
    glob(pattern).sort { |x, y| File.basename(x) <=> File.basename(y) }.each do |result|
      yield result
    end
  else
    glob(pattern) do |result|
      yield result
    end
  end
end

#glob_split(input, options = {}) ⇒ Object



432
433
434
# File 'lib/masamune/filesystem.rb', line 432

def glob_split(input, options = {})
  [input.include?('*') ? input.split('*').first + '*' : input, glob_to_regexp(input, options)]
end

#glob_stat(pattern, options = {}) ⇒ Object



176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
# File 'lib/masamune/filesystem.rb', line 176

def glob_stat(pattern, options = {})
  return Set.new(to_enum(:glob_stat, pattern, options)) unless block_given?
  case type(pattern)
  when :hdfs
    hadoop_fs('-ls', '-R', pattern, safe: true) do |line|
      next if line =~ /\AFound \d+ items/
      size, date, time, name = line.split(/\s+/).last(4)
      next unless size && date && time && name
      prefixed_name = remote_prefix(pattern) + name
      yield OpenStruct.new(name: prefixed_name, mtime: Time.parse("#{date} #{time} +0000").at_beginning_of_minute.utc, size: size.to_i)
    end
  when :s3
    file_glob, file_regexp = glob_split(pattern, recursive: true)
    s3cmd('ls', '--recursive', s3b(file_glob), safe: true) do |line|
      next if line =~ /\$folder$/
      date, time, size, name = line.split(/\s+/)
      next unless size && date && time && name
      next unless name =~ file_regexp
      yield OpenStruct.new(name: name, mtime: Time.parse("#{date} #{time} +0000").at_beginning_of_minute.utc, size: size.to_i)
    end
  when :local
    Dir.glob(pattern.gsub(%r{/\*(\.\w+)?\Z}, '/**/*\1')) do |file|
      stat = File.stat(file)
      yield OpenStruct.new(name: file, mtime: stat.mtime.at_beginning_of_minute.utc, size: stat.size.to_i)
    end
  end
end

#glob_to_regexp(input, options = {}) ⇒ Object



436
437
438
439
440
441
442
# File 'lib/masamune/filesystem.rb', line 436

def glob_to_regexp(input, options = {})
  if input.include?('*') || options.fetch(:recursive, false)
    /\A#{Regexp.escape(input).gsub('\\*', '.*?').gsub(%r{\/\.\*\?\z}, '/?.*?')}/
  else
    /\A#{Regexp.escape(input)}\z/
  end
end

#mkdir!(*dirs) ⇒ Object



213
214
215
216
217
218
219
220
221
222
223
224
225
226
# File 'lib/masamune/filesystem.rb', line 213

def mkdir!(*dirs)
  dirs.uniq!
  dirs.group_by { |path| type(path) }.each do |type, dir_set|
    case type
    when :hdfs
      hadoop_fs('-mkdir', '-p', *dir_set)
    when :s3
      touch!(*dir_set.map { |dir| File.join(dir, '.not_empty') })
    when :local
      missing_dir_set = dir_set.reject { |dir| File.exist?(dir) }
      FileUtils.mkdir_p(missing_dir_set, file_util_args) if missing_dir_set.any?
    end
  end
end

#mktemp!(path) ⇒ Object



420
421
422
423
424
# File 'lib/masamune/filesystem.rb', line 420

def mktemp!(path)
  get_path(path, SecureRandom.urlsafe_base64).tap do |file|
    touch!(file)
  end
end

#mktempdir!(path) ⇒ Object



426
427
428
429
430
# File 'lib/masamune/filesystem.rb', line 426

def mktempdir!(path)
  get_path(path, SecureRandom.urlsafe_base64).tap do |dir|
    mkdir!(dir)
  end
end

#move_dir(src, dst) ⇒ Object



348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
# File 'lib/masamune/filesystem.rb', line 348

def move_dir(src, dst)
  check_immutable_path!(src)
  case [type(src), type(dst)]
  when %i[hdfs hdfs]
    move_file_to_file(src, dst)
  when %i[hdfs local]
    copy_file_to_dir(src, dst)
    remove_dir(src)
  when %i[s3 s3]
    s3cmd('mv', '--recursive', d(src), f(dst))
  when %i[s3 local]
    s3cmd('get', '--recursive', d(src), f(dst))
    remove_dir(src)
  when %i[s3 hdfs]
    copy_file_to_dir(src, dst)
    remove_dir(src)
  when %i[hdfs s3]
    copy_file_to_dir(src, d(dst))
    remove_dir(src)
  when %i[local local]
    move_file_to_file(src, dst)
  when %i[local hdfs]
    move_file_to_file(src, dst)
  when %i[local s3]
    s3cmd('put', '--recursive', d(src), d(dst))
    remove_dir(src)
  end
end

#move_file_to_dir(src, dst) ⇒ Object



342
343
344
345
346
# File 'lib/masamune/filesystem.rb', line 342

def move_file_to_dir(src, dst)
  check_immutable_path!(src)
  mkdir!(dst) unless type(dst) == :s3
  move_file_helper(src, dst, true)
end

#move_file_to_file(src, dst) ⇒ Object



336
337
338
339
340
# File 'lib/masamune/filesystem.rb', line 336

def move_file_to_file(src, dst)
  check_immutable_path!(src)
  mkdir!(dirname(dst)) unless type(dst) == :s3
  move_file_helper(src, dst, false)
end

#parent_paths(path) ⇒ Object



95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
# File 'lib/masamune/filesystem.rb', line 95

def parent_paths(path)
  prefix = remote_prefix(path)
  if prefix
    node = path.split(prefix).last
  else
    prefix = ''
    node = path
  end

  return [] if prefix.blank? && node.blank?
  parent_paths = node ? File.expand_path(node, '/').split('/') : []
  parent_paths.reject!(&:blank?)
  parent_paths.prepend('/') if node =~ %r{\A/}
  tmp = []
  result = []
  parent_paths.each do |part|
    tmp << part
    current_path = prefix + File.join(tmp)
    result << current_path
  end
  result.pop
  result
end

#path?(symbol) ⇒ Boolean

Returns:

  • (Boolean)


72
73
74
# File 'lib/masamune/filesystem.rb', line 72

def path?(symbol)
  @paths.key?(symbol)
end

#relative_path?(path) ⇒ Boolean

Returns:

  • (Boolean)


90
91
92
93
# File 'lib/masamune/filesystem.rb', line 90

def relative_path?(path)
  return false if remote_prefix(path)
  path[0] != '/'
end

#remove_dir(dir) ⇒ Object



322
323
324
325
326
327
328
329
330
331
332
333
334
# File 'lib/masamune/filesystem.rb', line 322

def remove_dir(dir)
  raise "#{dir} is root path, cannot remove" if root_path?(dir)
  check_immutable_path!(dir)
  case type(dir)
  when :hdfs
    hadoop_fs('-rm', '-r', dir)
  when :s3
    s3cmd('del', '--recursive', s3b(dir, dir: true))
    s3cmd('del', '--recursive', s3b("#{dir}_$folder$"))
  when :local
    FileUtils.rmtree(dir, file_util_args)
  end
end

#remove_file(file) ⇒ Object



310
311
312
313
314
315
316
317
318
319
320
# File 'lib/masamune/filesystem.rb', line 310

def remove_file(file)
  check_immutable_path!(file)
  case type(file)
  when :hdfs
    hadoop_fs('-rm', file)
  when :s3
    s3cmd('del', s3b(file, dir: false))
  when :local
    FileUtils.rm(file, file_util_args)
  end
end

#resolve_file(paths = []) ⇒ Object



126
127
128
# File 'lib/masamune/filesystem.rb', line 126

def resolve_file(paths = [])
  Array.wrap(paths).select { |path| File.exist?(path) && File.file?(path) }.first
end

#root_path?(path) ⇒ Boolean

Returns:

  • (Boolean)

Raises:

  • (ArgumentError)


119
120
121
122
123
124
# File 'lib/masamune/filesystem.rb', line 119

def root_path?(path)
  raise ArgumentError, 'path cannot be nil' if path.nil?
  raise ArgumentError, 'path cannot be blank' if path.blank?
  raise ArgumentError, 'path cannot be relative' if relative_path?(path)
  parent_paths(path).count < 1
end

#stat(file_or_dir) ⇒ Object

Raises:

  • (ArgumentError)


204
205
206
207
208
209
210
211
# File 'lib/masamune/filesystem.rb', line 204

def stat(file_or_dir)
  raise ArgumentError, 'cannot contain wildcard' if file_or_dir.include?('*')
  result = glob_stat(file_or_dir)
  return unless result.any?
  max_time = result.map { |stat| stat.try(:mtime) }.compact.max
  sum_size = result.map { |stat| stat.try(:size) }.compact.reduce(:+)
  OpenStruct.new(name: file_or_dir, mtime: max_time, size: sum_size)
end

#touch!(*files) ⇒ Object



141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
# File 'lib/masamune/filesystem.rb', line 141

def touch!(*files)
  files.uniq!
  files.group_by { |path| type(path) }.each do |type, file_set|
    mkdir!(*file_set.map { |file| File.dirname(file) }) unless type == :s3
    case type
    when :hdfs
      hadoop_fs('-touchz', *file_set)
    when :s3
      empty = Tempfile.new('masamune_empty')
      file_set.each do |file|
        s3cmd('put', empty.path, s3b(file, dir: false))
      end
    when :local
      FileUtils.touch(file_set, file_util_args)
      FileUtils.chmod(FILE_MODE, file_set, file_util_args)
    end
  end
end

#write(buf, dst) ⇒ Object



393
394
395
396
397
398
399
400
401
# File 'lib/masamune/filesystem.rb', line 393

def write(buf, dst)
  case type(dst)
  when :local
    mkdir!(File.dirname(dst))
    File.open(dst, 'w') do |file|
      file.write buf
    end
  end
end