Workers
Workers is a Ruby gem for performing work in background threads. Design goals include high performance, low latency, simple API, customizability, and multi-layered architecture. It provides a number of simple to use classes that solve a wide range of concurrency problems. It is used by Tribe to implement event-driven actors.
Installation
Add this line to your application's Gemfile:
gem 'workers'
And then execute:
$ bundle
Or install it yourself as:
$ gem install workers
Parallel Map
Parallel map is the simplest way to get started with the Workers gem. It is similar to Ruby's built-in Array#map method except each element is mapped in parallel.
Workers.map([1, 2, 3, 4, 5]) { |i| i * i }
Any exceptions while mapping with cause the entire map method to fail. If your block is prone to temporary failures (exceptions), you can retry it.
Workers.map([1, 2, 3, 4, 5], :max_tries => 100) do |i|
if rand <= 0.5
puts "Sometimes I like to fail while computing #{i} * #{i}."
raise 'sad face'
end
i * i
end
Tasks
Tasks and task groups provide more flexibility than parallel map. The main benefit is that you get to decide how you want to handle exceptions.
# Create a task group (it contains a pool of worker threads).
group = Workers::TaskGroup.new
# Add tasks to the group.
10.times do |i|
10.times do |j|
group.add do
group.synchronize { puts "Computing #{i} * #{j}..." }
i * j # Last statement executed is the result of the task.
end
end
end
# Execute the tasks (blocks until the tasks complete).
# Returns true if all tasks succeed. False if any fail.
group.run
# Return an array of all the tasks.
group.tasks
# Return an array of the successful tasks.
group.successes
# Return an array of the failed tasks (raised an exception).
group.failures
# Review the results.
group.tasks.each do |t|
t.succeeded? # True or false (false if an exception occurred).
t.failed? # True or false (true if an exception occurred).
t.input # Input value.
t.result # Output value (the result of i * i in this example).
t.exception # The exception if one exists.
t.max_tries # Maximum number of attempts.
t.tries # Actual number of attempts.
end
Note that instances of TaskGroup provide a 'synchronize' method. This method uses a mutex so you can serialize portions of your tasks that aren't thread safe.
Options
group = Workers::TaskGroup.new(
:logger => nil, # Ruby logger instance.
:pool => Workers.pool # The workers pool used to execute timer callbacks.
)
task = Workers::Task.new(
:logger => nil, # Ruby logger instance.
:perform => proc {}, # Required option. Block of code to run.
:args => [], # Array of arguments passed to the 'perform' block.
:finished => nil, # Callback to execute after attempting to run the task.
:max_tries => 1, # Number of times to try completing the task (without an exception).
)
Workers
Basic
The main purpose of the Worker class is to add an event system on top of Ruby's built-in Thread class. This greatly simplifies inter-thread communication.
# Initialize a worker pool.
pool = Workers::Pool.new
# Perform some work in the background.
100.times do
pool.perform do
sleep(rand(3))
puts "Hello world from thread #{Thread.current.object_id}"
end
end
# Tell the workers to shutdown.
pool.shutdown do
puts "Worker thread #{Thread.current.object_id} is shutting down."
end
# Wait for the workers to shutdown.
pool.join
Advanced
The Worker class is designed to be customized through inheritence and its event system:
# Create a subclass that handles custom events.
class CustomWorker < Workers::Worker
private
def process_event(event)
case event.command
when :my_custom
puts "Worker received custom event: #{event.data}"
sleep(1)
end
end
end
# Create a pool that uses your custom worker class.
pool = Workers::Pool.new(:worker_class => CustomWorker)
# Tell the workers to do some work using custom events.
100.times do |i|
pool.enqueue(:my_custom, i)
end
# Tell the workers to shutdown.
pool.shutdown do
puts "Worker thread #{Thread.current.object_id} is shutting down."
end
# Wait for the workers to shutdown.
pool.join
Note that you can use custom workers without a pool. This effectively gives you direct access to a single event-driven thread.
Options
worker = Workers::Worker.new(
:logger => nil, # Ruby Logger instance.
:input_queue => nil # Ruby Queue used for input events.
)
Pools
Pools allow a group of workers to share a work queue. The Workers gem has a default pool (Workers.pool) with 20 workers so in most cases you won't need to create your own. Pools can be adjusted using the below methods:
# Create a pool.
pool = Workers::Pool.new
# Return the number of workers in the pool.
pool.size
# Increase the number of workers in the pool.
pool.(5)
# Decrease the number of workers in the pool.
pool.contract(5)
# Resize the pool size to a specific value.
pool.resize(20)
Options
pool = Workers::Pool.new(
:size => 20, # Number of threads to create.
:logger => nil, # Ruby Logger instance.
:worker_class => Workers::Worker # Class of worker to use for this pool.
)
Timers
Timers provide a way to execute code in the future:
# Create a one shot timer that executes in 1.5 seconds.
timer = Workers::Timer.new(1.5) do
puts 'Hello world'
end
# Create a periodic timer that loops infinitely or until 'cancel' is called.
timer = Workers::PeriodicTimer.new(1) do
puts 'Hello world many times'
end
# Let the timer print some lines.
sleep(5)
# Shutdown the timer.
timer.cancel
Options
timer = Workers::Timer.new(1,
:logger => nil, # Ruby logger instance.
:repeat => false, # Repeat the timer until 'cancel' is called.
:scheduler => Workers.scheduler, # The scheduler that manages execution.
:callback => nil # The proc to execute (provide this or a block, but not both).
)
timer = Workers::PeriodicTimer.new(1,
:logger => nil, # Ruby logger instance.
:scheduler => Workers.scheduler, # The scheduler that manages execution.
:callback => nil # The proc to execute (provide this or a block, but not both).
)
Schedulers
Schedulers are what trigger Timers to fire. The Workers gem has a default scheduler (Workers.scheduler) so in most cases you won't need to create your own. Schedulers execute timers using a pool of workers so make sure your timer block is thread safe. You can create additional schedulers as necessary:
# Create a workers pool with a larger than default thread count (optional).
pool = Workers::Pool.new(:size => 100)
# Create a scheduler.
scheduler = Workers::Scheduler.new(:pool => pool)
# Create a timer that uses the above scheduler.
timer = Workers::Timer.new(1, :scheduler => scheduler) do
puts 'Hello world'
end
# Wait for the timer to fire.
sleep(5)
# Shutdown the scheduler.
scheduler.dispose
Options
scheduler = Workers::Scheduler.new(
:logger => nil, # Ruby logger instance.
:pool => Workers::Pool.new # The workers pool used to execute timer callbacks.
)
Contributing
- Fork it
- Create your feature branch (
git checkout -b my-new-feature
) - Commit your changes (
git commit -am 'Add some feature'
) - Push to the branch (
git push origin my-new-feature
) - Create new Pull Request