A thread-pool implementation making use of Eventbox
The following class implements a thread-pool with a fixed number of threads to be borrowed by the pool
method.
It shows how the action method start_pool_thread
makes use of the private yield_call next_job
to query, wait for and retrieve an object from the event scope.
This kind of object is the block that is given to pool
.
Although all closures (blocks, procs and lambdas) are wrapped in a way that allows safe calls from the event scope, it is just passed through to the action scope and retrieved as the result value of next_job
.
When this happens, the wrapping is automatically removed, so that the pure block given to pool
is called in start_pool_thread
.
That way each action thread runs one block at the same time, but all started action threads process the blocks concurrently.
class ThreadPool < Eventbox
async_call def init(pool_size)
@que = [] # Initialize an empty job queue
@jobless = [] # Initialize the list of jobless action threads
pool_size.times do # Start up x action threads
start_pool_thread
end
end
# The action call returns immediately, but spawns a new thread.
private action def start_pool_thread
while bl=next_job # Each new thread waits for a job to be pooled
bl.call # Execute the external job enqueued by `pool`
end
end
# Get the next job or wait for one
# The method is private, so that it's accessible in start_pool_thread action but not externally
private yield_call def next_job(result)
if @que.empty? # No job pooled?
@jobless << result # Enqueue the action thread to the list of jobless workers
else # Already pooled jobs?
result.yield @que.shift # Take the oldest job and let next_job return with this job
end
end
# Enqueue a new job
async_call def pool(&block)
if @jobless.empty? # No jobless thread available?
@que << block # Append the external block as job into the queue
else # A thread is waiting?
@jobless.shift.yield block # Take one thread and let next_job return the given job
end # so that it is processed by the pool_thread action above
end
end
This ThreadPool
can be used like so:
tp = ThreadPool.new(3) # Create a thread pool with 3 action threads
5.times do |i| # Start 5 jobs concurrently
tp.pool do # pool never blocks, but enqueues jobs when no free thread is available
sleep 1 # The mission of each job: Wait for 1 second (3 jobs concurrently)
p [i, Thread.current.object_id]
end
end
# It gives something like the following output after 1 second:
[2, 47030774465880]
[1, 47030775602740]
[0, 47030774464940]
# and something like this after one more seconds:
[3, 47030775602740]
[4, 47030774465880]
Eventbox's builtin thread-pool Eventbox::ThreadPool is implemented on top of Eventbox similar to the above. In addition there are various battle proof implementations of thread-pools such a these in concurrent-ruby, which are faster and more feature rich than the above.
However Eventbox comes into play when things are getting more complicated or more customized. Imagine the thread-pool has to schedule it's tasks not just to cheep threads, but to more expensive or more constraint resources. In such cases available abstractions don't fit well to the problem. Instead the above example can be used as a basis for your own extensions.