Class: Cnvrg::Downloader::Client
- Inherits:
-
Object
- Object
- Cnvrg::Downloader::Client
- Defined in:
- lib/cnvrg/downloader/client.rb
Direct Known Subclasses
Cnvrg::Downloader::Clients::AzureClient, Cnvrg::Downloader::Clients::GcpClient, Cnvrg::Downloader::Clients::S3Client
Class Method Summary collapse
Instance Method Summary collapse
- #cut_prefix(prefix, file) ⇒ Object
- #decrypt(str) ⇒ Object
- #download(storage_path, local_path, decrypt: true) ⇒ Object
- #extract_key_iv(sts_path) ⇒ Object
-
#initialize(params) ⇒ Client
constructor
A new instance of Client.
- #link_file(cached_commits, local_path, dataset_title, file_name) ⇒ Object
- #mkdir(path, recursive: false) ⇒ Object
- #prepare_download(local_path) ⇒ Object
- #safe_download(storage_path, local_path, decrypt: true) ⇒ Object
- #safe_operation(local_path) ⇒ Object
- #safe_upload(storage_path, local_path) ⇒ Object
- #upload(storage_path, local_path) ⇒ Object
Constructor Details
#initialize(params) ⇒ Client
Returns a new instance of Client.
9 10 11 12 13 14 |
# File 'lib/cnvrg/downloader/client.rb', line 9 def initialize(params) @key = '' @iv = '' @client = '' @bucket = '' end |
Class Method Details
.factory(params) ⇒ Object
79 80 81 82 83 84 85 86 87 88 89 90 |
# File 'lib/cnvrg/downloader/client.rb', line 79 def self.factory(params) params = params.as_json case params["storage"] when 's3', 'minio' return Cnvrg::Downloader::Clients::S3Client.new(sts_path: params["path_sts"], access_key: params["sts_a"], secret: params["sts_s"], session_token: params["sts_st"], region: params["region"], bucket: params["bucket"], encryption: params["encryption"], endpoint: params["endpoint"], storage: params["storage"]) when 'azure' azure_params = params.symbolize_keys.slice(*[:storage_account_name, :storage_access_key, :container, :sts]) return Cnvrg::Downloader::Clients::AzureClient.new(**azure_params) when 'gcp' return Cnvrg::Downloader::Clients::GcpClient.new(project_id: params["project_id"], credentials: params["credentials"], bucket_name: params["bucket_name"], sts: params["sts"]) end end |
Instance Method Details
#cut_prefix(prefix, file) ⇒ Object
32 33 34 |
# File 'lib/cnvrg/downloader/client.rb', line 32 def cut_prefix(prefix, file) file.gsub(prefix, '').gsub(/^\/*/, '') end |
#decrypt(str) ⇒ Object
71 72 73 |
# File 'lib/cnvrg/downloader/client.rb', line 71 def decrypt(str) Cnvrg::Helpers.decrypt(@key, @iv, str) end |
#download(storage_path, local_path, decrypt: true) ⇒ Object
36 37 38 |
# File 'lib/cnvrg/downloader/client.rb', line 36 def download(storage_path, local_path, decrypt: true) ### need to be implemented.. end |
#extract_key_iv(sts_path) ⇒ Object
16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 |
# File 'lib/cnvrg/downloader/client.rb', line 16 def extract_key_iv(sts_path) count = 0 begin count += 1 sts_file = open(sts_path, {ssl_verify_mode: 0}) sts = sts_file.read sts.split("\n") rescue => e backoff_time_seconds = backoff_time(count) sleep backoff_time_seconds Cnvrg::Logger.log_error(e) retry if count <= 20 raise StandardError.new("Cant access storage: #{e.}") end end |
#link_file(cached_commits, local_path, dataset_title, file_name) ⇒ Object
40 41 42 43 44 45 46 47 48 49 50 51 52 53 |
# File 'lib/cnvrg/downloader/client.rb', line 40 def link_file(cached_commits, local_path, dataset_title, file_name) prepare_download(local_path) cached_commits.each do |cached_commit| nfs_path = "/nfs-disk/#{cached_commit}/#{dataset_title}/#{file_name}" if File.exist? nfs_path FileUtils.ln(nfs_path, local_path) return true end end false rescue => e Cnvrg::Logger.log_error(e) false end |
#mkdir(path, recursive: false) ⇒ Object
63 64 65 |
# File 'lib/cnvrg/downloader/client.rb', line 63 def mkdir(path, recursive: false) recursive ? FileUtils.mkdir_p(path) : FileUtils.mkdir(path) end |
#prepare_download(local_path) ⇒ Object
67 68 69 |
# File 'lib/cnvrg/downloader/client.rb', line 67 def prepare_download(local_path) mkdir(File.dirname(local_path), recursive: true) end |
#safe_download(storage_path, local_path, decrypt: true) ⇒ Object
55 56 57 |
# File 'lib/cnvrg/downloader/client.rb', line 55 def safe_download(storage_path, local_path, decrypt: true) safe_operation(local_path) { self.download(storage_path, local_path, decrypt: decrypt) } end |
#safe_operation(local_path) ⇒ Object
92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 |
# File 'lib/cnvrg/downloader/client.rb', line 92 def safe_operation(local_path) n = 1 error = nil while n <= RETRIES begin yield error = nil break rescue => e backoff_time_seconds = backoff_time(n) = "Got error: #{e.class.name} with message: #{e.} while uploading / downloading a single file: #{local_path}, retry: #{n} of: #{RETRIES}" if n < RETRIES += ", next retry in: #{backoff_time_seconds} seconds" else += ", done retry, continuing to the next file" end Cnvrg::Logger.() sleep backoff_time_seconds n += 1 error = e end end raise error if error.present? true end |
#safe_upload(storage_path, local_path) ⇒ Object
75 76 77 |
# File 'lib/cnvrg/downloader/client.rb', line 75 def safe_upload(storage_path, local_path) safe_operation(local_path) { self.upload(storage_path, local_path) } end |
#upload(storage_path, local_path) ⇒ Object
59 60 61 |
# File 'lib/cnvrg/downloader/client.rb', line 59 def upload(storage_path, local_path) ### need to be implemented.. end |