Class: JobPool

Inherits:
Object
  • Object
show all
Defined in:
lib/job_pool.rb,
lib/job_pool/job.rb,
lib/job_pool/version.rb

Overview

TODO: take mutex once in kill_all TODO: rewrite wait_next

Defined Under Namespace

Classes: Job, TooManyJobsError

Constant Summary collapse

VERSION =

:nodoc:

"0.6"

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(options = {}) ⇒ JobPool

## Options

  • max_jobs: the maximum number of jobs that can be running at any one time, or nil if unlimited.



16
17
18
19
20
21
# File 'lib/job_pool.rb', line 16

def initialize(options={})
  @mutex ||= Mutex.new

  @processes ||= []   # TODO: convert this to a hash by child thread?
  @max_jobs = options[:max_jobs]
end

Instance Attribute Details

#max_jobsObject

Returns the value of attribute max_jobs.



12
13
14
# File 'lib/job_pool.rb', line 12

def max_jobs
  @max_jobs
end

Instance Method Details

#_add(process) ⇒ Object



71
72
73
74
75
76
77
78
# File 'lib/job_pool.rb', line 71

def _add process
  @mutex.synchronize do
    if @max_jobs && @processes.count >= @max_jobs
      raise JobPool::TooManyJobsError.new("launched process #{@processes.count+1} of #{@max_processes} maximum")
    end
    @processes.push process
  end
end

#_remove(process) ⇒ Object

removes process from process table. pass a block that cleans up after the process. _remove may be called lots of times but block will only be called once



82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
# File 'lib/job_pool.rb', line 82

def _remove process
  cleanup = false

  @mutex.synchronize do
    cleanup = process._deactivate
    raise "process not in process table??" if cleanup && !@processes.include?(process)
  end

  # don't want to hold mutex when calling callback because it might block
  if cleanup
    yield
    @mutex.synchronize do
      value = @processes.delete(process)
      raise "someone else deleted process??" unless value
    end
  end
end

#countObject



31
32
33
# File 'lib/job_pool.rb', line 31

def count
  @mutex.synchronize { @processes.count }
end

#find(&block) ⇒ Object



35
36
37
# File 'lib/job_pool.rb', line 35

def find &block
  @mutex.synchronize { @processes.find(&block) }
end

#firstObject



27
28
29
# File 'lib/job_pool.rb', line 27

def first
  @mutex.synchronize { @processes.first }
end

#kill_allObject



39
40
41
42
43
44
45
# File 'lib/job_pool.rb', line 39

def kill_all
  # TODO: this is racy...  if someone else is starting processes,
  # we'll just endless loop.  can we take the mutex once outside the loop?
  while f = first
    f.kill
  end
end

#launch(*args) ⇒ Object



23
24
25
# File 'lib/job_pool.rb', line 23

def launch *args
  JobPool::Job.new self, *args
end

#log(msg) ⇒ Object

called there’s an error in a job’s subthreads. never happens during normal # usage.



67
68
69
# File 'lib/job_pool.rb', line 67

def log msg
  STDERR.puts msg
end

#wait_next(nonblock = nil) ⇒ Object

blocks until any child process returns (unless nonblock is true, where it returns nil TODO) raises an exception if no processes are running, or if called nonblocking and no processes have finished (see ThreadsWait#next_wait for details).



50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
# File 'lib/job_pool.rb', line 50

def wait_next nonblock=nil
  # we wait on child threads since calling waitpid would produce a race condition.

  threads = {}
  @processes.each { |p|
    threads[p._child_thread] = p
  }

  # TODO: test nonblock

  thread = ThreadsWait.new(threads.keys).next_wait(nonblock)
  process = threads[thread]
  process.stop # otherwise process will be in an indeterminite state
  process
end