Class: Cnvrg::Storage
- Inherits:
-
Object
- Object
- Cnvrg::Storage
- Defined in:
- lib/cnvrg/storage.rb
Instance Method Summary collapse
- #clone(commit: nil) ⇒ Object
- #do_parallel ⇒ Object
- #download_file_thread(file) ⇒ Object
- #file_gen_thread(file_gen) ⇒ Object
- #file_gen_upload_thread(files_generator) ⇒ Object
- #get_chunks_size ⇒ Object
- #handle_errors ⇒ Object
- #init_progress_bar(size: nil, title: "Download Progress") ⇒ Object
-
#initialize(dataset: nil, project: nil, root_path: nil) ⇒ Storage
constructor
A new instance of Storage.
- #log_error(action: nil, error: '') ⇒ Object
- #make_progress(size: 1) ⇒ Object
- #storage_action(files_generator: nil, action: nil, progress: {size: 0, title: ''}) ⇒ Object
- #upload_files(files_generator, progress: {size: 0, title: ''}) ⇒ Object
Constructor Details
#initialize(dataset: nil, project: nil, root_path: nil) ⇒ Storage
Returns a new instance of Storage.
3 4 5 6 7 |
# File 'lib/cnvrg/storage.rb', line 3 def initialize(dataset: nil, project: nil, root_path: nil) @element = dataset || project @root_path = root_path @client = @element.get_storage_client end |
Instance Method Details
#clone(commit: nil) ⇒ Object
32 33 34 35 36 37 38 39 40 41 42 43 44 |
# File 'lib/cnvrg/storage.rb', line 32 def clone(commit: nil) files_generator = Proc.new do |params| @element.get_clone_chunk(commit: commit, chunk_size: params[:limit], offset: params[:offset]) end action = Proc.new do |storage, local| @client.safe_download(storage, local) end @stats = @element.get_stats progress = {size: @stats['commit_size'], title: "Clone Progress"} storage_action(files_generator: files_generator, action: action, progress: progress) end |
#do_parallel ⇒ Object
105 106 107 108 109 110 111 112 |
# File 'lib/cnvrg/storage.rb', line 105 def do_parallel Parallel.each( -> { @files.empty? ? (@finished ? Parallel::Stop : sleep(1)) : @files.pop }, in_threads: get_chunks_size) do |file| if file == 1 next end yield(file) end end |
#download_file_thread(file) ⇒ Object
114 115 116 117 118 119 120 121 122 123 124 125 126 |
# File 'lib/cnvrg/storage.rb', line 114 def download_file_thread(file) return if file.blank? local_path = file['name'] storage_path = file['path'] (0..5).each do begin # @client.download(storage_path, "#{@root_path}/#{local_path}") break rescue => e log_error(action: "download #{local_path}", error: e.) end end end |
#file_gen_thread(file_gen) ⇒ Object
87 88 89 90 91 92 93 94 95 96 97 |
# File 'lib/cnvrg/storage.rb', line 87 def file_gen_thread(file_gen) offset = 0 chunk_size = get_chunks_size while true files = file_gen.call(limit: chunk_size, offset: offset) break if files.blank? files.each{|f| @files.push(f)} offset += files.size end @finished = true end |
#file_gen_upload_thread(files_generator) ⇒ Object
61 62 63 64 65 66 67 68 |
# File 'lib/cnvrg/storage.rb', line 61 def file_gen_upload_thread(files_generator) while true files = files_generator files.each{|f| @files.push(f)} break if files.blank? end @finished = true end |
#get_chunks_size ⇒ Object
14 15 16 |
# File 'lib/cnvrg/storage.rb', line 14 def get_chunks_size (ENV['CNVRG_STORAGE_CHUNK_SIZE'] || 10).to_i end |
#handle_errors ⇒ Object
99 100 101 102 103 |
# File 'lib/cnvrg/storage.rb', line 99 def handle_errors if @storage_errors.present? File.open(@element.working_dir + "/.cnvrg/errors.yml", "w+"){|f| f.write @storage_errors.to_yaml} end end |
#init_progress_bar(size: nil, title: "Download Progress") ⇒ Object
18 19 20 21 22 23 24 25 |
# File 'lib/cnvrg/storage.rb', line 18 def (size: nil, title: "Download Progress") @progressbar = ProgressBar.create(:title => title, :progress_mark => '=', :format => "%b>>%i| %p%% %t", :starting_at => 0, :total => size, :autofinish => true) end |
#log_error(action: nil, error: '') ⇒ Object
9 10 11 |
# File 'lib/cnvrg/storage.rb', line 9 def log_error(action: nil, error: '') "[#{Time.now}] (#{action || 'default'}) #{error}" end |
#make_progress(size: 1) ⇒ Object
27 28 29 |
# File 'lib/cnvrg/storage.rb', line 27 def make_progress(size: 1) @progressbar.progress += size end |
#storage_action(files_generator: nil, action: nil, progress: {size: 0, title: ''}) ⇒ Object
70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 |
# File 'lib/cnvrg/storage.rb', line 70 def storage_action(files_generator: nil, action: nil, progress: {size: 0, title: ''}) ### the generator files should have {path (encrypted), name, size} (progress) @storage_errors = [] @finished = false @files = Queue.new t = Thread.new{file_gen_thread(files_generator)} do_parallel do |file| self.download_file_thread(file) do |local, storage| action.call(local, storage) end self.make_progress(size: file['size']) end t.join handle_errors end |
#upload_files(files_generator, progress: {size: 0, title: ''}) ⇒ Object
46 47 48 |
# File 'lib/cnvrg/storage.rb', line 46 def upload_files(commit: nil) end |