Class: Etna::Filesystem::Metis

Inherits:
Etna::Filesystem show all
Defined in:
lib/etna/filesystem.rb

Instance Method Summary collapse

Methods inherited from Etna::Filesystem

#mv, #rm_rf, #tmpdir

Constructor Details

#initialize(metis_client:, project_name:, bucket_name:, root: '/', uuid: SecureRandom.uuid) ⇒ Metis

Returns a new instance of Metis.



248
249
250
251
252
253
254
# File 'lib/etna/filesystem.rb', line 248

def initialize(metis_client:, project_name:, bucket_name:, root: '/', uuid: SecureRandom.uuid)
  @metis_client = metis_client
  @project_name = project_name
  @bucket_name = bucket_name
  @root = root
  @metis_uid = uuid
end

Instance Method Details

#create_download_workflowObject



319
320
321
# File 'lib/etna/filesystem.rb', line 319

def create_download_workflow
  Etna::Clients::Metis::MetisDownloadWorkflow.new(metis_client: @metis_client, project_name: @project_name, bucket_name: @bucket_name, max_attempts: 3)
end

#create_upload_workflowObject



261
262
263
# File 'lib/etna/filesystem.rb', line 261

def create_upload_workflow
  Etna::Clients::Metis::MetisUploadWorkflow.new(metis_client: @metis_client, metis_uid: @metis_uid, project_name: @project_name, bucket_name: @bucket_name, max_attempts: 3)
end

#do_streaming_download(wp, metis_file) ⇒ Object



323
324
325
# File 'lib/etna/filesystem.rb', line 323

def do_streaming_download(wp, metis_file)
  create_download_workflow.do_download(wp, metis_file)
end

#do_streaming_upload(rp, dest, size_hint) ⇒ Object



305
306
307
308
309
310
311
# File 'lib/etna/filesystem.rb', line 305

def do_streaming_upload(rp, dest, size_hint)
  streaming_upload = Etna::Clients::Metis::MetisUploadWorkflow::StreamingIOUpload.new(readable_io: rp, size_hint: size_hint)
  create_upload_workflow.do_upload(
      streaming_upload,
      metis_path_of(dest)
  )
end

#exist?(src) ⇒ Boolean

Returns:

  • (Boolean)


354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
# File 'lib/etna/filesystem.rb', line 354

def exist?(src)
  begin
    response = list_metis_directory(::File.dirname(src))
  rescue Etna::Error => e
    if e.status == 404
      return false
    elsif e.message =~ /Invalid folder/
      return false
    end

    raise e
  end

  response.files.all.any? { |f| f.file_name == ::File.basename(src) } ||
      response.folders.all.any? { |f| f.folder_name == ::File.basename(src) }
end

#list_metis_directory(path) ⇒ Object



336
337
338
# File 'lib/etna/filesystem.rb', line 336

def list_metis_directory(path)
  @metis_client.list_folder(Etna::Clients::Metis::ListFolderRequest.new(project_name: @project_name, bucket_name: @bucket_name, folder_path: metis_path_of(path)))
end

#ls(dir) ⇒ Object



349
350
351
352
# File 'lib/etna/filesystem.rb', line 349

def ls(dir)
  response = list_metis_directory(::File.dirname(dir))
  response.files.map { |f| [:file, f.file_name] } + response.folders.map { |f| [:folder, f.folder_name] }
end

#metis_path_of(path) ⇒ Object



256
257
258
259
# File 'lib/etna/filesystem.rb', line 256

def metis_path_of(path)
  joined = ::File.join(@root, path)
  joined[0] == "/" ? joined.slice(1..-1) : joined
end

#mkdir_p(dir) ⇒ Object



340
341
342
343
344
345
346
347
# File 'lib/etna/filesystem.rb', line 340

def mkdir_p(dir)
  create_folder_request = Etna::Clients::Metis::CreateFolderRequest.new(
      project_name: @project_name,
      bucket_name: @bucket_name,
      folder_path: metis_path_of(dir),
  )
  @metis_client.create_folder(create_folder_request)
end

#with_hot_pipe(opts, receiver, *args, &block) ⇒ Object



265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
# File 'lib/etna/filesystem.rb', line 265

def with_hot_pipe(opts, receiver, *args, &block)
  rp, wp = IO.pipe
  begin
    executor = Concurrent::SingleThreadExecutor.new(fallback_policy: :abort)
    begin
      if opts.include?('w')
        future = Concurrent::Promises.future_on(executor) do
          self.send(receiver, rp, *args)
        rescue => e
          Etna::Application.instance.logger.log_error(e)
          raise e
        ensure
          rp.close
        end

        yield wp
      else
        future = Concurrent::Promises.future_on(executor) do
          self.send(receiver, wp, *args)
        rescue => e
          Etna::Application.instance.logger.log_error(e)
          raise e
        ensure
          wp.close
        end

        yield rp
      end

      future.wait!
    ensure
      executor.shutdown
      executor.kill unless executor.wait_for_termination(5)
    end
  ensure
    rp.close
    wp.close
  end
end

#with_readable(src, opts = 'r', &block) ⇒ Object



327
328
329
330
331
332
333
334
# File 'lib/etna/filesystem.rb', line 327

def with_readable(src, opts = 'r', &block)
  metis_file = list_metis_directory(::File.dirname(src)).files.all.find { |f| f.file_name == ::File.basename(src) }
  raise "Metis file at #{@project_name}/#{@bucket_name}/#{@root}/#{src} not found.  No such file" if metis_file.nil?

  self.with_hot_pipe(opts, :do_streaming_download, metis_file) do |rp|
    yield rp
  end
end

#with_writeable(dest, opts = 'w', size_hint: nil, &block) ⇒ Object



313
314
315
316
317
# File 'lib/etna/filesystem.rb', line 313

def with_writeable(dest, opts = 'w', size_hint: nil, &block)
  self.with_hot_pipe(opts, :do_streaming_upload, dest, size_hint) do |wp|
    yield wp
  end
end