Class: Omnibus::ThreadPool

Inherits:
Object
  • Object
show all
Defined in:
lib/omnibus/thread_pool.rb

Instance Method Summary collapse

Constructor Details

#initialize(size, abort_on_exception = true) ⇒ ThreadPool

Create a new thread pool of the given size. If a block is given, it is assumed the thread pool is wrapping an operation and will block until all operations complete.

Examples:

Using a block

ThreadPool.new(5) do |pool|
  complex_things.each do |thing|
    pool.schedule { thing.run }
  end
end

Using the object

pool = ThreadPool.new(5)
# ...
pool.schedule { complex_operation_1 }
pool.schedule { complex_operation_2 }
# ...
pool.schedule { complex_operation_4 }
# ...
pool.shutdown

# or

at_exit { pool.shutdown }

Parameters:

  • size (Integer)

    the number of items to put in the thread pool

  • abort_on_exception (Boolean) (defaults to: true)

    if the thread should abort the main thread also on a failure



52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
# File 'lib/omnibus/thread_pool.rb', line 52

def initialize(size, abort_on_exception = true)
  @size = size
  @jobs = Queue.new

  @pool = Array.new(@size) do |i|
    Thread.new do
      Thread.abort_on_exception = abort_on_exception
      Thread.current[:id] = i

      catch(:exit) do
        loop do
          job, args = @jobs.pop
          job.call(*args)
        end
      end
    end
  end

  if block_given?
    yield self
    shutdown
  end
end

Instance Method Details

#schedule(*args, &block) ⇒ void

This method returns an undefined value.

Schedule a single item onto the queue. If arguments are given, those arguments are used when calling the block in the queue. This is useful if you have arguments that you need to pass in from a parent binding.

Parameters:

  • args (Object, Array<Object>)

    the arguments to pass to the block when calling

  • block (Proc)

    the block to execute



88
89
90
# File 'lib/omnibus/thread_pool.rb', line 88

def schedule(*args, &block)
  @jobs << [block, args]
end

#shutdowntrue

Stop the thread pool. This method quietly injects an exit clause into the queue (sometimes called “poison”) and then waits for all threads to exit.

Returns:

  • (true)


99
100
101
102
103
104
105
106
107
# File 'lib/omnibus/thread_pool.rb', line 99

def shutdown
  @size.times do
    schedule { throw :exit }
  end

  @pool.map(&:join)

  true
end