Class: PCOWS
- Inherits:
-
Object
- Object
- PCOWS
- Defined in:
- lib/bio-vcf/pcows.rb
Constant Summary collapse
- RUNNINGEXT =
file extension
'part'
Instance Method Summary collapse
- #cleanup ⇒ Object
-
#initialize(num_threads, chunk_size, name = File.basename(__FILE__), timeout = 180, quiet = false, debug = false) ⇒ PCOWS
constructor
A new instance of PCOWS.
-
#process_output(func = nil, type = :by_line, blocking = false) ⇒ Object
—- In this section the output gets collected and passed on to a printer thread.
- #process_remaining_output ⇒ Object
- #submit_final_worker(func, state) ⇒ Object
-
#submit_worker(func, state) ⇒ Object
Feed the worker ‘func and state’ to COWS.
-
#wait_for_worker(info) ⇒ Object
Wait for a worker slot to appear.
-
#wait_for_worker_slot ⇒ Object
Make sure no more than num_threads are running at the same time - this is achieved by checking the PID table and the running files in the tmpdir.
-
#wait_for_workers ⇒ Object
This is the final cleanup after the reader thread is done.
Constructor Details
#initialize(num_threads, chunk_size, name = File.basename(__FILE__), timeout = 180, quiet = false, debug = false) ⇒ PCOWS
Returns a new instance of PCOWS.
9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 |
# File 'lib/bio-vcf/pcows.rb', line 9 def initialize(num_threads,chunk_size,name=File.basename(__FILE__),timeout=180,quiet=false,debug=false) num_threads = cpu_count() if not num_threads # FIXME: set to cpu_num by default # $stderr.print "Using ",num_threads,"threads \n" @num_threads = num_threads @chunk_size = chunk_size @pid_list = [] @name = name @timeout = timeout @quiet = quiet @debug = debug if @debug $stderr.print "PCOWS running in DEBUG MODE\n" end if multi_threaded @tmpdir = Dir::mktmpdir(@name+'_') end @last_output = 0 # counter @output_locked = false end |
Instance Method Details
#cleanup ⇒ Object
217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 |
# File 'lib/bio-vcf/pcows.rb', line 217 def cleanup() @pid_list.each do |info| (pid,count,fn) = info if pid_running?(pid) $stderr.print "Killing child ",[info],"\n" begin Process.kill 9, pid Process.wait pid rescue Errno::ENOENT $stdout.puts "INFO: #{pidfile} did not exist: Errno::ENOENT" if not @quiet rescue Errno::ESRCH $stdout.puts "INFO: The process #{opid} did not exist: Errno::ESRCH" if not @quiet end end File.unlink(fn) if File.exist?(fn) cleanup_keep_file(fn,wait: false) tempfn = fn+'.'+RUNNINGEXT File.unlink(tempfn) if File.exist?(tempfn) end cleanup_tmpdir() end |
#process_output(func = nil, type = :by_line, blocking = false) ⇒ Object
—- In this section the output gets collected and passed on to a
printer thread. This function makes sure the printing is
ordered and that no printers are running at the same
time. The printer thread should be doing as little processing
as possible.
In this implementation type==:by_line will call func for
each line. Otherwise it is called once with the filename.
105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 |
# File 'lib/bio-vcf/pcows.rb', line 105 def process_output(func=nil,type=:by_line, blocking=false) return if single_threaded output = lambda { |fn| if type == :by_line File.new(fn).each_line { |buf| print buf } else func.call(fn) end } if @output_locked # ---- is the other thread still running? We wait until it # is finished to start the next one (pid,count,fn) = @output_locked $stderr.print "Checking for output_lock on existing #{fn}\n" if not @quiet return if File.exist?(fn) # continue because thread still processing # Now we should remove the .keep file cleanup_keep_file(fn) @last_output += 1 # get next one in line @output_locked = false end # ---- process the next output chunk. After completion it # gets renamed to chunk.keep. This to avoid missing # output (if we unlink the file prematurely) if info = @pid_list[@last_output] (pid,count,fn) = info $stderr.print "Testing (#{@last_output}) for output file ",[info],"\n" if @debug if File.exist?(fn) # Yes! We have the next output, create outputter @output_locked = info $stderr.print "Set lock on ",[info],"\n" if not @quiet if not blocking $stderr.print "Processing output file #{fn} (non-blocking)\n" if not @quiet pid = fork do output.call(fn) # after finishing output move it to .keep FileUtils::mv(fn,fn+'.keep') exit(0) end Process.detach(pid) else $stderr.print "Processing output file #{fn} (blocking)\n" if not @quiet output.call(fn) FileUtils::mv(fn,fn+'.keep') end else sleep 0.2 end end end |
#process_remaining_output ⇒ Object
195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 |
# File 'lib/bio-vcf/pcows.rb', line 195 def process_remaining_output() return if single_threaded $stderr.print "Processing remaining output...\n" if not @quiet while @output_locked sleep 0.2 process_output() # keep trying end @pid_list.each do |info| (pid,count,fn) = info while pid_or_file_running?(pid,fn) or File.exist?(fn) $stderr.print "Trying: ",[info],"\n" if not @quiet process_output(nil,:by_line,true) sleep 0.2 end end while @output_locked sleep 0.1 process_output(nil,:by_line,true) end cleanup_tmpdir() end |
#submit_final_worker(func, state) ⇒ Object
64 65 66 67 |
# File 'lib/bio-vcf/pcows.rb', line 64 def submit_final_worker(func,state) @final_worker = true submit_worker(func,state) end |
#submit_worker(func, state) ⇒ Object
Feed the worker ‘func and state’ to COWS. Note that func is a lambda closure so it can pick up surrounding scope at invocation in addition to the data captured in ‘state’.
33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 |
# File 'lib/bio-vcf/pcows.rb', line 33 def submit_worker(func,state) pid = nil if multi_threaded count = @pid_list.size+1 fn = mktmpfilename(count) pid = fork do # ---- This is running a new copy-on-write process tempfn = fn+'.'+RUNNINGEXT STDOUT.reopen(File.open(tempfn, 'w+')) func.call(state).each { | line | print line } STDOUT.flush STDOUT.close # sleep 0.1 # f.flush # f.close # sleep 0.2 # interval to make sure we are done writing, # otherwise there may be misses at the end of a # block (maybe the f.close fixed it) FileUtils::mv(tempfn,fn) exit(0) end Process.detach(pid) else # ---- Single threaded: call in main process and output immediately func.call(state).each { | line | print line } end @pid_list << [ pid,count,fn ] return true end |
#wait_for_worker(info) ⇒ Object
Wait for a worker slot to appear. When working the pid is writing a file with extension .part(ial). After completion the file is renamed without .part and a slot is free.
160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 |
# File 'lib/bio-vcf/pcows.rb', line 160 def wait_for_worker(info) (pid,count,fn) = info if pid_or_file_running?(pid,fn) $stderr.print "Waiting up to #{@timeout} seconds for pid=#{pid} to complete #{fn}\n" if not @quiet begin Timeout.timeout(@timeout) do while not File.exist?(fn) # wait for the result to appear sleep 0.2 return if not pid_or_file_running?(pid,fn) # worker is gone end end # Partial file should have been renamed: raise "FATAL: child process #{pid} appears to have crashed #{fn}" if not File.exist?(fn) $stderr.print "OK pid=#{pid}, processing starts of #{fn}\n" if not @quiet rescue Timeout::Error # Kill it to speed up exit Process.kill 9, pid Process.wait pid $stderr.print "FATAL: child process killed because it stopped responding, pid = #{pid}, fn = #{fn}, count = #{count}\n" $stderr.print "Bailing out" raise end end end |
#wait_for_worker_slot ⇒ Object
Make sure no more than num_threads are running at the same time - this is achieved by checking the PID table and the running files in the tmpdir
73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 |
# File 'lib/bio-vcf/pcows.rb', line 73 def wait_for_worker_slot() return if single_threaded Timeout.timeout(@timeout) do = false while true # ---- count running pids running = @pid_list.reduce(0) do | sum, info | (pid,count,fn) = info if pid_or_file_running?(pid,fn) sum+1 else sum end end return if running < @num_threads if not $stderr.print "Waiting for slot (timeout=#{@timeout})\n" if not @quiet = true end sleep 0.1 end end end |
#wait_for_workers ⇒ Object
This is the final cleanup after the reader thread is done. All workers need to complete.
188 189 190 191 192 193 |
# File 'lib/bio-vcf/pcows.rb', line 188 def wait_for_workers() return if single_threaded @pid_list.each do |info| wait_for_worker(info) end end |