Class: JobPool
- Inherits:
-
Object
- Object
- JobPool
- Defined in:
- lib/job_pool.rb,
lib/job_pool/job.rb,
lib/job_pool/version.rb
Overview
TODO: take mutex once in kill_all TODO: rewrite wait_next
Defined Under Namespace
Classes: Job, TooManyJobsError
Constant Summary collapse
- VERSION =
:nodoc:
"0.6"
Instance Attribute Summary collapse
-
#max_jobs ⇒ Object
Returns the value of attribute max_jobs.
Instance Method Summary collapse
- #_add(process) ⇒ Object
-
#_remove(process) ⇒ Object
removes process from process table.
- #count ⇒ Object
- #find(&block) ⇒ Object
- #first ⇒ Object
-
#initialize(options = {}) ⇒ JobPool
constructor
## Options * max_jobs: the maximum number of jobs that can be running at any one time, or nil if unlimited.
- #kill_all ⇒ Object
- #launch(*args) ⇒ Object
-
#log(msg) ⇒ Object
called there’s an error in a job’s subthreads.
-
#wait_next(nonblock = nil) ⇒ Object
blocks until any child process returns (unless nonblock is true, where it returns nil TODO) raises an exception if no processes are running, or if called nonblocking and no processes have finished (see ThreadsWait#next_wait for details).
Constructor Details
#initialize(options = {}) ⇒ JobPool
## Options
-
max_jobs: the maximum number of jobs that can be running at any one time, or nil if unlimited.
16 17 18 19 20 21 |
# File 'lib/job_pool.rb', line 16 def initialize(={}) @mutex ||= Mutex.new @processes ||= [] # TODO: convert this to a hash by child thread? @max_jobs = [:max_jobs] end |
Instance Attribute Details
#max_jobs ⇒ Object
Returns the value of attribute max_jobs.
12 13 14 |
# File 'lib/job_pool.rb', line 12 def max_jobs @max_jobs end |
Instance Method Details
#_add(process) ⇒ Object
71 72 73 74 75 76 77 78 |
# File 'lib/job_pool.rb', line 71 def _add process @mutex.synchronize do if @max_jobs && @processes.count >= @max_jobs raise JobPool::TooManyJobsError.new("launched process #{@processes.count+1} of #{@max_processes} maximum") end @processes.push process end end |
#_remove(process) ⇒ Object
removes process from process table. pass a block that cleans up after the process. _remove may be called lots of times but block will only be called once
82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 |
# File 'lib/job_pool.rb', line 82 def _remove process cleanup = false @mutex.synchronize do cleanup = process._deactivate raise "process not in process table??" if cleanup && !@processes.include?(process) end # don't want to hold mutex when calling callback because it might block if cleanup yield @mutex.synchronize do value = @processes.delete(process) raise "someone else deleted process??" unless value end end end |
#count ⇒ Object
31 32 33 |
# File 'lib/job_pool.rb', line 31 def count @mutex.synchronize { @processes.count } end |
#find(&block) ⇒ Object
35 36 37 |
# File 'lib/job_pool.rb', line 35 def find &block @mutex.synchronize { @processes.find(&block) } end |
#first ⇒ Object
27 28 29 |
# File 'lib/job_pool.rb', line 27 def first @mutex.synchronize { @processes.first } end |
#kill_all ⇒ Object
39 40 41 42 43 44 45 |
# File 'lib/job_pool.rb', line 39 def kill_all # TODO: this is racy... if someone else is starting processes, # we'll just endless loop. can we take the mutex once outside the loop? while f = first f.kill end end |
#launch(*args) ⇒ Object
23 24 25 |
# File 'lib/job_pool.rb', line 23 def launch *args JobPool::Job.new self, *args end |
#log(msg) ⇒ Object
called there’s an error in a job’s subthreads. never happens during normal # usage.
67 68 69 |
# File 'lib/job_pool.rb', line 67 def log msg STDERR.puts msg end |
#wait_next(nonblock = nil) ⇒ Object
blocks until any child process returns (unless nonblock is true, where it returns nil TODO) raises an exception if no processes are running, or if called nonblocking and no processes have finished (see ThreadsWait#next_wait for details).
50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 |
# File 'lib/job_pool.rb', line 50 def wait_next nonblock=nil # we wait on child threads since calling waitpid would produce a race condition. threads = {} @processes.each { |p| threads[p._child_thread] = p } # TODO: test nonblock thread = ThreadsWait.new(threads.keys).next_wait(nonblock) process = threads[thread] process.stop # otherwise process will be in an indeterminite state process end |