Class: Bio::BaseSpace::MultipartUpload
- Inherits:
-
Object
- Object
- Bio::BaseSpace::MultipartUpload
- Defined in:
- lib/basespace/model/multipart_upload.rb
Overview
Multipart file upload class.
TODO This file is not yet ported as the multipartFileUpload class is just mentioned in the comment section of the BaseSpaceAPI file.
Instance Method Summary collapse
- #__cleanUp__ ⇒ Object
- #finalize ⇒ Object
- #getFileResponse ⇒ Object
- #getProgressRatio ⇒ Object
- #getRunningThreadCount ⇒ Object
- #getRunningTime ⇒ Object
- #getStatus ⇒ Object
-
#getTotalTransfered ⇒ Object
Returns the total data amoun transfered in Gb.
- #getTransRate ⇒ Object
- #haltUpload ⇒ Object
- #hasFinished ⇒ Object
-
#initialize(api, a_id, local_file, file_object, cpu_count, part_size, temp_dir, verbose) ⇒ MultipartUpload
constructor
A new instance of MultipartUpload.
- #pauseUpload ⇒ Object
- #run ⇒ Object
- #setup ⇒ Object
- #startUpload(returnOnFinish = 0, testInterval = 5) ⇒ Object
- #to_s ⇒ Object
- #to_str ⇒ Object
Constructor Details
#initialize(api, a_id, local_file, file_object, cpu_count, part_size, temp_dir, verbose) ⇒ MultipartUpload
Returns a new instance of MultipartUpload.
136 137 138 139 140 141 142 143 144 145 146 147 148 149 |
# File 'lib/basespace/model/multipart_upload.rb', line 136 def initialize(api, a_id, local_file, file_object, cpu_count, part_size, temp_dir, verbose) @api = api @analysis_id = a_id @local_file = local_file @remote_file = file_object @part_size = part_size @cpu_count = cpu_count @verbose = verbose @temp_dir = temp_dir @status = 'Initialized' @start_time = -1 #@repeat_count = 0 # number of chunks we uploaded multiple times setup end |
Instance Method Details
#__cleanUp__ ⇒ Object
207 208 209 |
# File 'lib/basespace/model/multipart_upload.rb', line 207 def __cleanUp__ self.stats[0] += 1 end |
#finalize ⇒ Object
239 240 241 242 243 244 245 246 247 |
# File 'lib/basespace/model/multipart_upload.rb', line 239 def finalize raise Exception('Cannot finalize a transfer with running threads.') if self.getRunningThreadCount() if self.Status=='Running' # code here for self.Status=='Completed' else raise Exception('To finalize the status of the transfer must be "Running."') end end |
#getFileResponse ⇒ Object
268 269 270 |
# File 'lib/basespace/model/multipart_upload.rb', line 268 def getFileResponse return self.remoteFile end |
#getProgressRatio ⇒ Object
295 296 297 298 299 |
# File 'lib/basespace/model/multipart_upload.rb', line 295 def getProgressRatio currentQ = float(self.tasks.qsize() - len(self.consumers)) # NOTE Python sublist [:6] already ported to Ruby [0..5] return str(float(self.totalTask - currentQ) / self.totalTask)[0..5] end |
#getRunningThreadCount ⇒ Object
272 273 274 |
# File 'lib/basespace/model/multipart_upload.rb', line 272 def getRunningThreadCount return sum(self.consumers.map { |c| c.is_alive() }) end |
#getRunningTime ⇒ Object
282 283 284 285 286 287 288 |
# File 'lib/basespace/model/multipart_upload.rb', line 282 def getRunningTime if self.StartTime==-1 return 0 else return time.time() - self.StartTime end end |
#getStatus ⇒ Object
264 265 266 |
# File 'lib/basespace/model/multipart_upload.rb', line 264 def getStatus return self.Status end |
#getTotalTransfered ⇒ Object
Returns the total data amoun transfered in Gb
291 292 293 |
# File 'lib/basespace/model/multipart_upload.rb', line 291 def getTotalTransfered return float((self.totalTask - self.tasks.qsize())*self.partSize) / 1000.0 end |
#getTransRate ⇒ Object
276 277 278 279 280 |
# File 'lib/basespace/model/multipart_upload.rb', line 276 def getTransRate # tasks completed size of file-parts # NOTE Python sublist [:6] already ported to Ruby [0..5] return str((self.totalTask - self.tasks.qsize())*self.partSize/self.getRunningTime())[0..5] + ' mb/s' end |
#haltUpload ⇒ Object
259 260 261 262 |
# File 'lib/basespace/model/multipart_upload.rb', line 259 def haltUpload self.consumers.each { |c| c.terminate() } self.Status = 'Terminated' end |
#hasFinished ⇒ Object
249 250 251 252 |
# File 'lib/basespace/model/multipart_upload.rb', line 249 def hasFinished return 0 if self.Status == 'Initialized' return !(self.getRunningThreadCount() > 0) end |
#pauseUpload ⇒ Object
254 255 256 257 |
# File 'lib/basespace/model/multipart_upload.rb', line 254 def pauseUpload self.pauseEvent.set() self.Status = 'Paused' end |
#run ⇒ Object
166 167 168 169 170 |
# File 'lib/basespace/model/multipart_upload.rb', line 166 def run while @status == 'Paused' or __check_queue__ time.sleep(self.wait) end end |
#setup ⇒ Object
172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 |
# File 'lib/basespace/model/multipart_upload.rb', line 172 def setup # determine the totalSize = os.path.getsize(self.localFile) fileCount = int(math.ceil(totalSize/(self.partSize*1024.0*1000))) if self.verbose print "TotalSize " + str(totalSize) print "Using split size " + str(self.partSize) +"Mb" print "Filecount " + str(fileCount) print "CPUs " + str(self.cpuCount) end # Establish communication queues self.tasks = multiprocessing.JoinableQueue() self.completedPool = multiprocessing.Queue() [1..fileCount].each { |i| # set up the task queue t = uploadTask(self.api,self.remoteFile.Id, i, fileCount, self.localFile, 0) self.tasks.put(t) } self.totalTask = self.tasks.qsize() # create consumers self.pauseEvent = multiprocessing.Event() self.haltEvent = multiprocessing.Event() if self.verbose print 'Creating %d consumers' % self.cpuCount print "queue size " + str(self.tasks.qsize()) end # NOTE Original code -- note the strange indent. Variables i and c not used. Buggy code? # self.consumers = [ Consumer(self.tasks, self.completedPool,self.pauseEvent,self.haltEvent) for i in xrange(self.cpuCount) ] # for c in self.consumers: self.tasks.put(None) # add poisson pill self.consumers = [0..self.cpuCount].map { |i| Consumer(self.tasks, self.completedPool, self.pauseEvent, self.haltEvent) } self.consumers.each { |c| self.tasks.put(nil) } end |
#startUpload(returnOnFinish = 0, testInterval = 5) ⇒ Object
211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 |
# File 'lib/basespace/model/multipart_upload.rb', line 211 def startUpload(returnOnFinish = 0, testInterval = 5) raise Exception('Cannot resume a ' + self.Status + ' multi-part upload session.') if self.Status == 'Terminated' or self.Status == 'Completed' if self.Status == 'Initialized' self.StartTime = time.time() self.consumers.each { |w| w.start() } elsif self.Status == 'Paused' self.pauseEvent.clear() end self.Status = 'Running' # If returnOnFinish is set if returnOnFinish i=0 while not self.hasFinished() print str(i) + ': ' + str(self) if self.verbose and i time.sleep(testInterval) i+=1 end self.finalize() return 1 else return 1 end end |
#to_s ⇒ Object
151 152 153 154 155 156 157 158 159 160 |
# File 'lib/basespace/model/multipart_upload.rb', line 151 def to_s # TODO fix this. # NOTE Python sublist notation [:5] already changed to Ruby [0..4] return "MPU - Stat: " + @status + ", LiveThread: " + str(self.getRunningThreadCount()) + \ ", RunTime: " + str(self.getRunningTime())[0..4] + 's' + \ ", Q-size " + str(self.tasks.qsize()) + \ ", Completed " + str(self.getProgressRatio()) + \ ", AVG TransferRate " + self.getTransRate() + \ ", Data transfered " + str(self.getTotalTransfered())[0..4] + 'Gb' end |
#to_str ⇒ Object
162 163 164 |
# File 'lib/basespace/model/multipart_upload.rb', line 162 def to_str return self.inspect end |