Class: JobPool
- Inherits:
-
Object
- Object
- JobPool
- Defined in:
- lib/job_pool.rb,
lib/job_pool/job.rb,
lib/job_pool/version.rb
Overview
TODO: take mutex once in kill_all TODO: rewrite wait_next
Defined Under Namespace
Classes: Job, TooManyJobsError
Constant Summary collapse
- VERSION =
"0.5"
Instance Attribute Summary collapse
-
#max_jobs ⇒ Object
Returns the value of attribute max_jobs.
Instance Method Summary collapse
- #_add(process) ⇒ Object
-
#_remove(process) ⇒ Object
removes process from process table.
- #count ⇒ Object
- #find(&block) ⇒ Object
- #first ⇒ Object
-
#initialize(options = {}) ⇒ JobPool
constructor
A new instance of JobPool.
- #kill_all ⇒ Object
- #launch(*args) ⇒ Object
-
#log(msg) ⇒ Object
called there’s an error in a job’s subthreads.
-
#wait_next(nonblock = nil) ⇒ Object
blocks until any child process returns (unless nonblock is true, where it returns nil TODO) raises an exception if no processes are running, or if called nonblocking and no processes have finished (see ThreadsWait#next_wait for details).
Constructor Details
#initialize(options = {}) ⇒ JobPool
Returns a new instance of JobPool.
13 14 15 16 17 18 |
# File 'lib/job_pool.rb', line 13 def initialize(={}) @mutex ||= Mutex.new @processes ||= [] # TODO: convert this to a hash by child thread? @max_jobs = [:max_jobs] end |
Instance Attribute Details
#max_jobs ⇒ Object
Returns the value of attribute max_jobs.
11 12 13 |
# File 'lib/job_pool.rb', line 11 def max_jobs @max_jobs end |
Instance Method Details
#_add(process) ⇒ Object
68 69 70 71 72 73 74 75 |
# File 'lib/job_pool.rb', line 68 def _add process @mutex.synchronize do if @max_jobs && @processes.count >= @max_jobs raise JobPool::TooManyJobsError.new("launched process #{@processes.count+1} of #{@max_processes} maximum") end @processes.push process end end |
#_remove(process) ⇒ Object
removes process from process table. pass a block that cleans up after the process. _remove may be called lots of times but block will only be called once
79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 |
# File 'lib/job_pool.rb', line 79 def _remove process cleanup = false @mutex.synchronize do cleanup = process._deactivate raise "process not in process table??" if cleanup && !@processes.include?(process) end # don't want to hold mutex when calling callback because it might block if cleanup yield @mutex.synchronize do value = @processes.delete(process) raise "someone else deleted process??" unless value end end end |
#count ⇒ Object
28 29 30 |
# File 'lib/job_pool.rb', line 28 def count @mutex.synchronize { @processes.count } end |
#find(&block) ⇒ Object
32 33 34 |
# File 'lib/job_pool.rb', line 32 def find &block @mutex.synchronize { @processes.find(&block) } end |
#first ⇒ Object
24 25 26 |
# File 'lib/job_pool.rb', line 24 def first @mutex.synchronize { @processes.first } end |
#kill_all ⇒ Object
36 37 38 39 40 41 42 |
# File 'lib/job_pool.rb', line 36 def kill_all # TODO: this is racy... if someone else is starting processes, # we'll just endless loop. can we take the mutex once outside the loop? while f = first f.kill end end |
#launch(*args) ⇒ Object
20 21 22 |
# File 'lib/job_pool.rb', line 20 def launch *args JobPool::Job.new self, *args end |
#log(msg) ⇒ Object
called there’s an error in a job’s subthreads. never happens during normal # usage.
64 65 66 |
# File 'lib/job_pool.rb', line 64 def log msg STDERR.puts msg end |
#wait_next(nonblock = nil) ⇒ Object
blocks until any child process returns (unless nonblock is true, where it returns nil TODO) raises an exception if no processes are running, or if called nonblocking and no processes have finished (see ThreadsWait#next_wait for details).
47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 |
# File 'lib/job_pool.rb', line 47 def wait_next nonblock=nil # we wait on child threads since calling waitpid would produce a race condition. threads = {} @processes.each { |p| threads[p._child_thread] = p } # TODO: test nonblock thread = ThreadsWait.new(threads.keys).next_wait(nonblock) process = threads[thread] process.stop # otherwise process will be in an indeterminite state process end |