Class: Omnibus::ThreadPool

Inherits:
Object
  • Object
show all
Defined in:
lib/omnibus/thread_pool.rb

Instance Method Summary collapse

Constructor Details

#initialize(size) ⇒ ThreadPool

Create a new thread pool of the given size. If a block is given, it is assumed the thread pool is wrapping an operation and will block until all operations complete.

Examples:

Using a block

ThreadPool.new(5) do |pool|
  complex_things.each do |thing|
    pool.schedule { thing.run }
  end
end

Using the object

pool = ThreadPool.new(5)
# ...
pool.schedule { complex_operation_1 }
pool.schedule { complex_operation_2 }
# ...
pool.schedule { complex_operation_4 }
# ...
pool.shutdown

# or

at_exit { pool.shutdown }

52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
# File 'lib/omnibus/thread_pool.rb', line 52

def initialize(size)
  @size = size
  @jobs = Queue.new

  @pool = Array.new(@size) do |i|
    Thread.new do
      Thread.abort_on_exception = true
      Thread.current[:id] = i

      catch(:exit) do
        loop do
          job, args = @jobs.pop
          job.call(*args)
        end
      end
    end
  end

  if block_given?
    yield self
    shutdown
  end
end

Instance Method Details

#schedule(*args, &block) ⇒ void

This method returns an undefined value.

Schedule a single item onto the queue. If arguments are given, those arguments are used when calling the block in the queue. This is useful if you have arguments that you need to pass in from a parent binding.


88
89
90
# File 'lib/omnibus/thread_pool.rb', line 88

def schedule(*args, &block)
  @jobs << [block, args]
end

#shutdowntrue

Stop the thread pool. This method quietly injects an exit clause into the queue (sometimes called “poison”) and then waits for all threads to exit.


99
100
101
102
103
104
105
106
107
# File 'lib/omnibus/thread_pool.rb', line 99

def shutdown
  @size.times do
    schedule { throw :exit }
  end

  @pool.map(&:join)

  true
end