Class: JobPool

Inherits:
Object
  • Object
show all
Defined in:
lib/job_pool.rb,
lib/job_pool/job.rb,
lib/job_pool/version.rb

Overview

TODO: take mutex once in kill_all TODO: rewrite wait_next

Defined Under Namespace

Classes: Job, TooManyJobsError

Constant Summary collapse

VERSION =
"0.5"

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(options = {}) ⇒ JobPool

Returns a new instance of JobPool.



13
14
15
16
17
18
# File 'lib/job_pool.rb', line 13

def initialize(options={})
  @mutex ||= Mutex.new

  @processes ||= []   # TODO: convert this to a hash by child thread?
  @max_jobs = options[:max_jobs]
end

Instance Attribute Details

#max_jobsObject

Returns the value of attribute max_jobs.



11
12
13
# File 'lib/job_pool.rb', line 11

def max_jobs
  @max_jobs
end

Instance Method Details

#_add(process) ⇒ Object



68
69
70
71
72
73
74
75
# File 'lib/job_pool.rb', line 68

def _add process
  @mutex.synchronize do
    if @max_jobs && @processes.count >= @max_jobs
      raise JobPool::TooManyJobsError.new("launched process #{@processes.count+1} of #{@max_processes} maximum")
    end
    @processes.push process
  end
end

#_remove(process) ⇒ Object

removes process from process table. pass a block that cleans up after the process. _remove may be called lots of times but block will only be called once



79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
# File 'lib/job_pool.rb', line 79

def _remove process
  cleanup = false

  @mutex.synchronize do
    cleanup = process._deactivate
    raise "process not in process table??" if cleanup && !@processes.include?(process)
  end

  # don't want to hold mutex when calling callback because it might block
  if cleanup
    yield
    @mutex.synchronize do
      value = @processes.delete(process)
      raise "someone else deleted process??" unless value
    end
  end
end

#countObject



28
29
30
# File 'lib/job_pool.rb', line 28

def count
  @mutex.synchronize { @processes.count }
end

#find(&block) ⇒ Object



32
33
34
# File 'lib/job_pool.rb', line 32

def find &block
  @mutex.synchronize { @processes.find(&block) }
end

#firstObject



24
25
26
# File 'lib/job_pool.rb', line 24

def first
  @mutex.synchronize { @processes.first }
end

#kill_allObject



36
37
38
39
40
41
42
# File 'lib/job_pool.rb', line 36

def kill_all
  # TODO: this is racy...  if someone else is starting processes,
  # we'll just endless loop.  can we take the mutex once outside the loop?
  while f = first
    f.kill
  end
end

#launch(*args) ⇒ Object



20
21
22
# File 'lib/job_pool.rb', line 20

def launch *args
  JobPool::Job.new self, *args
end

#log(msg) ⇒ Object

called there’s an error in a job’s subthreads. never happens during normal # usage.



64
65
66
# File 'lib/job_pool.rb', line 64

def log msg
  STDERR.puts msg
end

#wait_next(nonblock = nil) ⇒ Object

blocks until any child process returns (unless nonblock is true, where it returns nil TODO) raises an exception if no processes are running, or if called nonblocking and no processes have finished (see ThreadsWait#next_wait for details).



47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
# File 'lib/job_pool.rb', line 47

def wait_next nonblock=nil
  # we wait on child threads since calling waitpid would produce a race condition.

  threads = {}
  @processes.each { |p|
    threads[p._child_thread] = p
  }

  # TODO: test nonblock

  thread = ThreadsWait.new(threads.keys).next_wait(nonblock)
  process = threads[thread]
  process.stop # otherwise process will be in an indeterminite state
  process
end