Class: Bio::BaseSpace::Consumer
- Inherits:
-
Object
- Object
- Bio::BaseSpace::Consumer
- Defined in:
- lib/basespace/model/multipart_upload.rb
Overview
Multipart file upload consumer class.
TODO This file is not yet ported as the multipartFileUpload class is just mentioned in the comment section of the BaseSpaceAPI file.
Instance Method Summary collapse
-
#initialize(task_queue, result_queue, pause_event, halt_event) ⇒ Consumer
constructor
A new instance of Consumer.
-
#run ⇒ Object
TODO.
Constructor Details
#initialize(task_queue, result_queue, pause_event, halt_event) ⇒ Consumer
Returns a new instance of Consumer.
85 86 87 88 89 90 91 92 93 94 |
# File 'lib/basespace/model/multipart_upload.rb', line 85 def initialize(task_queue, result_queue, pause_event, halt_event) # TODO http://stackoverflow.com/questions/710785/working-with-multiple-processes-in-ruby # http://stackoverflow.com/questions/855805/please-introduce-a-multi-processing-library-in-perl-or-ruby # http://docs.python.jp/2.6/library/multiprocessing.html #multiprocessing.Process.__init__(self) @task_queue = task_queue @result_queue = result_queue @pause = pauseEvent @halt = haltEvent end |
Instance Method Details
#run ⇒ Object
TODO
97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 |
# File 'lib/basespace/model/multipart_upload.rb', line 97 def run proc_name = self.name while True unless self.pause.is_set() next_task = self.task_queue.get() end if next_task is None or self.halt.is_set() # check if we are out of jobs or have been halted # Poison pill means shutdown puts "#{proc_name}: Exiting" self.task_queue.task_done() break elsif self.pause.is_set() # if we have been paused, sleep for a bit then check back puts "#{proc_name}: Paused" time.sleep(3) else # do some work puts "#{proc_name}: #{next_task}" answer = next_task() self.task_queue.task_done() if answer.state == 1 # case everything went well self.result_queue.put(answer) else # case something sent wrong if next_task.attempt < 3 self.task_queue.put(next_task) # queue the guy for a retry else # problems, shutting down this party self.halt.set() # halt all other process end end end end end |