Class: PCOWS

Inherits:
Object
  • Object
show all
Defined in:
lib/bio-vcf/pcows.rb

Constant Summary collapse

RUNNINGEXT =

file extension

'part'

Instance Method Summary collapse

Constructor Details

#initialize(num_threads, chunk_size, name = File.basename(__FILE__), timeout = 180, quiet = false, debug = false) ⇒ PCOWS

Returns a new instance of PCOWS.



9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
# File 'lib/bio-vcf/pcows.rb', line 9

def initialize(num_threads,chunk_size,name=File.basename(__FILE__),timeout=180,quiet=false,debug=false)
  num_threads = cpu_count() if not num_threads # FIXME: set to cpu_num by default
  # $stderr.print "Using ",num_threads,"threads \n"
  @num_threads = num_threads
  @chunk_size = chunk_size
  @pid_list = []
  @name = name
  @timeout = timeout
  @quiet = quiet
  @debug = debug
  if @debug
    $stderr.print "PCOWS running in DEBUG MODE\n"
  end
  if multi_threaded
    @tmpdir =  Dir::mktmpdir(@name+'_')
  end
  @last_output = 0 # counter
  @output_locked = false
end

Instance Method Details

#cleanupObject



217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
# File 'lib/bio-vcf/pcows.rb', line 217

def cleanup()
  @pid_list.each do |info|
    (pid,count,fn) = info
    if pid_running?(pid)
      $stderr.print "Killing child ",[info],"\n"
      begin
        Process.kill 9, pid
        Process.wait pid
      rescue Errno::ENOENT
        $stdout.puts "INFO: #{pidfile} did not exist: Errno::ENOENT" if not @quiet
      rescue Errno::ESRCH
        $stdout.puts "INFO: The process #{opid} did not exist: Errno::ESRCH" if not @quiet
      end
    end
    File.unlink(fn) if File.exist?(fn)
    cleanup_keep_file(fn,wait: false)
    tempfn = fn+'.'+RUNNINGEXT
    File.unlink(tempfn) if File.exist?(tempfn)
  end
  cleanup_tmpdir()
end

#process_output(func = nil, type = :by_line, blocking = false) ⇒ Object

—- In this section the output gets collected and passed on to a

printer thread. This function makes sure the printing is
ordered and that no printers are running at the same
time. The printer thread should be doing as little processing
as possible.

In this implementation type==:by_line will call func for
each line. Otherwise it is called once with the filename.


105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
# File 'lib/bio-vcf/pcows.rb', line 105

def process_output(func=nil,type=:by_line, blocking=false)
  return if single_threaded
  output = lambda { |fn|
    if type == :by_line
      File.new(fn).each_line { |buf|
        print buf
      }
    else
      func.call(fn)
    end
  }
  if @output_locked
    # ---- is the other thread still running? We wait until it
    #      is finished to start the next one
    (pid,count,fn) = @output_locked
    $stderr.print "Checking for output_lock on existing #{fn}\n" if not @quiet
    return if File.exist?(fn)  # continue because thread still processing
    # Now we should remove the .keep file
    cleanup_keep_file(fn)
    @last_output += 1          # get next one in line
    @output_locked = false
  end
  # ---- process the next output chunk. After completion it
  #      gets renamed to chunk.keep. This to avoid missing
  #      output (if we unlink the file prematurely)
  if info = @pid_list[@last_output]
    (pid,count,fn) = info
    $stderr.print "Testing (#{@last_output}) for output file ",[info],"\n" if @debug
    if File.exist?(fn)
      # Yes! We have the next output, create outputter
      @output_locked = info
      $stderr.print "Set lock on ",[info],"\n" if not @quiet
      if not blocking
        $stderr.print "Processing output file #{fn} (non-blocking)\n" if not @quiet
        pid = fork do
          output.call(fn)
          # after finishing output move it to .keep
          FileUtils::mv(fn,fn+'.keep')
          exit(0)
        end
        Process.detach(pid)
      else
        $stderr.print "Processing output file #{fn} (blocking)\n" if not @quiet
        output.call(fn)
        FileUtils::mv(fn,fn+'.keep')
      end
    else
      sleep 0.2
    end
  end
end

#process_remaining_outputObject



195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
# File 'lib/bio-vcf/pcows.rb', line 195

def process_remaining_output()
  return if single_threaded
  $stderr.print "Processing remaining output...\n" if not @quiet
  while @output_locked
    sleep 0.2
    process_output() # keep trying
  end
  @pid_list.each do |info|
    (pid,count,fn) = info
    while pid_or_file_running?(pid,fn) or File.exist?(fn)
      $stderr.print "Trying: ",[info],"\n" if not @quiet
      process_output(nil,:by_line,true)
      sleep 0.2
    end
  end
  while @output_locked
    sleep 0.1
    process_output(nil,:by_line,true)
  end
  cleanup_tmpdir()
end

#submit_final_worker(func, state) ⇒ Object



64
65
66
67
# File 'lib/bio-vcf/pcows.rb', line 64

def submit_final_worker(func,state)
  @final_worker = true
  submit_worker(func,state)
end

#submit_worker(func, state) ⇒ Object

Feed the worker ‘func and state’ to COWS. Note that func is a lambda closure so it can pick up surrounding scope at invocation in addition to the data captured in ‘state’.



33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
# File 'lib/bio-vcf/pcows.rb', line 33

def submit_worker(func,state)
  pid = nil
  if multi_threaded
    count = @pid_list.size+1
    fn = mktmpfilename(count)
    pid = fork do
      # ---- This is running a new copy-on-write process
      tempfn = fn+'.'+RUNNINGEXT
      STDOUT.reopen(File.open(tempfn, 'w+'))
      func.call(state).each { | line | print line }
      STDOUT.flush
      STDOUT.close
      # sleep 0.1
      # f.flush
      # f.close
      # sleep 0.2  # interval to make sure we are done writing,
                 # otherwise there may be misses at the end of a
                 # block (maybe the f.close fixed it)

      FileUtils::mv(tempfn,fn)
      exit(0)
    end
    Process.detach(pid)
  else
    # ---- Single threaded: call in main process and output immediately
    func.call(state).each { | line | print line }
  end
  @pid_list << [ pid,count,fn ]
  return true
end

#wait_for_worker(info) ⇒ Object

Wait for a worker slot to appear. When working the pid is writing a file with extension .part(ial). After completion the file is renamed without .part and a slot is free.



160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
# File 'lib/bio-vcf/pcows.rb', line 160

def wait_for_worker(info)
  (pid,count,fn) = info
  if pid_or_file_running?(pid,fn)
    $stderr.print "Waiting up to #{@timeout} seconds for pid=#{pid} to complete #{fn}\n" if not @quiet
    begin
      Timeout.timeout(@timeout) do
        while not File.exist?(fn)  # wait for the result to appear
          sleep 0.2
          return if not pid_or_file_running?(pid,fn) # worker is gone
        end
      end
      # Partial file should have been renamed:
      raise "FATAL: child process #{pid} appears to have crashed #{fn}" if not File.exist?(fn)
      $stderr.print "OK pid=#{pid}, processing starts of #{fn}\n" if not @quiet
    rescue Timeout::Error
      # Kill it to speed up exit
      Process.kill 9, pid
      Process.wait pid
      $stderr.print "FATAL: child process killed because it stopped responding, pid = #{pid}, fn = #{fn}, count = #{count}\n"
      $stderr.print "Bailing out"
      raise
    end
  end
end

#wait_for_worker_slotObject

Make sure no more than num_threads are running at the same time - this is achieved by checking the PID table and the running files in the tmpdir



73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
# File 'lib/bio-vcf/pcows.rb', line 73

def wait_for_worker_slot()
  return if single_threaded
  Timeout.timeout(@timeout) do
    printed_timeout_message = false
    while true
      # ---- count running pids
      running = @pid_list.reduce(0) do | sum, info |
        (pid,count,fn) = info
        if pid_or_file_running?(pid,fn)
          sum+1
        else
          sum
        end
      end
      return if running < @num_threads
      if not printed_timeout_message
        $stderr.print "Waiting for slot (timeout=#{@timeout})\n" if not @quiet
        printed_timeout_message = true
      end
      sleep 0.1        
    end
  end
end

#wait_for_workersObject

This is the final cleanup after the reader thread is done. All workers need to complete.



188
189
190
191
192
193
# File 'lib/bio-vcf/pcows.rb', line 188

def wait_for_workers()
  return if single_threaded
  @pid_list.each do |info|
    wait_for_worker(info)
  end
end