ActionPool

ActionPool is just a simple thread pool. It allows for various constraints and resizing in a pretty easy and unobtrusive manner. You can set limits on how long tasks are worked on, as well as on the life of a thread. For things that like to use lots threads, it can be helpful to reuse threads instead of constantly recreating them.

install (easy):

gem install actionpool

install (less easy):

git clone http://github.com/spox/actionpool.git
cd actionpool
gem build *.gemspec
gem install ./

install (less easy that’s a little easier)

rip makes it easy to install directly from a github repository.

Testing

ActionPool is currently tested on:

  • Ruby 1.8.6-p383

  • Ruby 1.8.7-p248

  • Ruby 1.9.1-p376

  • JRuby 1.4.0

Documentation

rdocs

Example

Code:

require 'actionpool'

pool = ActionPool::Pool.new
pool.process do
    sleep(2)
    raise 'Wakeup main thread'
end
20.times do
    pool.process do
        puts "Thread: #{Thread.current}"
        sleep(0.1)
    end
end
begin
    sleep
rescue Exception => e
    puts "Thread pool woke me up: #{e}"
end

Result:

Thread: #<Thread:0x93ebeb8>
Thread: #<Thread:0x93eb92c>
Thread: #<Thread:0x93eb8a0>
Thread: #<Thread:0x93eb814>
Thread: #<Thread:0x93eb788>
Thread: #<Thread:0x93eb670>
Thread: #<Thread:0x93eb5e4>
Thread: #<Thread:0x93eb558>
Thread: #<Thread:0x93eb4cc>
Thread: #<Thread:0x93ebeb8>
Thread: #<Thread:0x93eb92c>
Thread: #<Thread:0x93eb8a0>
Thread: #<Thread:0x93eb814>
Thread: #<Thread:0x93eb788>
Thread: #<Thread:0x93eb670>
Thread: #<Thread:0x93eb5e4>
Thread: #<Thread:0x93eb558>
Thread: #<Thread:0x93eb4cc>
Thread: #<Thread:0x93eb92c>
Thread: #<Thread:0x93eb8a0>
Thread pool woke me up: Wakeup main thread

Important note

The worker threads in the ActionPool will catch all Exception objects that your block fails to catch. Instead of just eating these exceptions, they are passed back to the creating thread of the pool (generally the main thread). This is important to note if you care about processing unexpected exceptions and what allows the example code above to be woken up from its sleep.

The internals

Probably more information than you really want

ActionPool has some simple settings that make things work. First, the pool has a minimum and maximum number of allowed threads. On initialization, the minimum number of threads are created and put into the pool. By default, this is 10 threads. As the number of tasks added to the pool increases, the pool will grow as needed. When more tasks are in the pool than threads to process them, new threads will be added into the pool, until the maximum thread threshold is reached. Taking the example above, we can demonstrate this easily by adjusting our limits:

require 'actionpool'

pool = ActionPool::Pool.new(:min_threads => 1, :max_threads => 3)
pool.process do
    sleep(10)
    raise 'Wakeup main thread'
end
20.times do
    pool.process do
        puts "Thread: #{Thread.current}"
        sleep(rand(0.0))
    end
end
begin
    sleep
rescue Exception => e
    puts "Thread pool woke me up: #{e}"
end

Which results in:

Thread: #<Thread:0x86c1760>
Thread: #<Thread:0x86c1080>
Thread: #<Thread:0x86c1080>
Thread: #<Thread:0x86c1080>
Thread: #<Thread:0x86c1760>
Thread: #<Thread:0x86c1760>
Thread: #<Thread:0x86c1760>
Thread: #<Thread:0x86c1080>
Thread: #<Thread:0x86c1760>
Thread: #<Thread:0x86c1080>
Thread: #<Thread:0x86c1080>
Thread: #<Thread:0x86c1760>
Thread: #<Thread:0x86c1080>
Thread: #<Thread:0x86c1760>
Thread: #<Thread:0x86c1760>
Thread: #<Thread:0x86c1760>
Thread: #<Thread:0x86c1080>
Thread: #<Thread:0x86c1760>
Thread: #<Thread:0x86c1760>
Thread: #<Thread:0x86c1080>
Thread pool woke me up: Wakeup main thread

Our pool starts with a single thread that is occupied by the sleeping task waiting to raise an exception. As we begin to add new tasks, the pool grows to accommodate the growing number of tasks, until it reaches the maximum threshold of 3. At that point, the pool simply processes the tasks until the task list is empty.

The pool also has the ability to limit the amount of time a thread spends working on a given task. By default, a thread will work on a given task until the task is completed, or the pool is shutdown. However, as the following example shows, it is very easy to limit this time to avoid the pool being bogged down on long running tasks:

require 'actionpool'

pool = ActionPool::Pool.new(:min_threads => 1, :max_threads => 1, :a_to => 1)
pool.process do
    puts "#{Time.now}: I'm a long running task"
    sleep(100)
    raise 'Wakeup main thread'
end
pool.process do
    puts "#{Time.now}: Waiting for my turn"
    raise "I'm waking up the main thread"
end
begin
    sleep
rescue Exception => e
    puts "Thread pool woke me up: #{e}"
end

Results:

2009-10-10 08:47:08 -0700: I'm a long running task
2009-10-10 08:47:09 -0700: Waiting for my turn
Thread pool woke me up: I'm waking up the main thread

If you have a number of tasks you would like to schedule at once, it is easy with the add_jobs method:

require 'actionpool'

pool = ActionPool::Pool.new
a = 0
lock = Mutex.new
tasks = [].fill(lambda{ lock.synchronize{ a += 1 } }, 0..19)
pool.add_jobs(tasks)
pool.shutdown
puts "Result: #{a}"

Results:

Result: 20

Passing arguments to tasks is now available as well:

require 'actionpool'

pool = ActionPool::Pool.new
string = 'Hello world'
puts "Original: #{string}. ID: #{string.object_id}"
pool << [lambda{|var| puts "Passed: #{var}. ID: #{var.object_id}"}, [string.dup]]
pool << [lambda{|a,b| puts "Passed: #{a} | #{b}. ID: #{a.object_id} | #{b.object_id}"}, [string, string.dup]]
pool.shutdown

Results:

Original: Hello world. ID: 70651630
Passed: Hello world. ID: 70651250
Passed: Hello world | Hello world. ID: 70651630 | 70651100

Last remarks

If you find any bugs, please report them through github. If you are in need of any help, you can generally find me on DALnet and Freenode.

License

ActionPool is licensed under the LGPLv3
Copyright (c) 2009 spox <[email protected]>