Taskinator
A simple orchestration library for running complex processes or workflows in Ruby. Processes are defined using a simple DSL, where the sequences and tasks are defined. Processes can then queued for execution. Sequences can be sychronous or asynchronous, and the overall process can be monitored for completion or failure.
Processes and tasks are executed by background workers and you can use any one of the following gems:
The configuration and state of each process and their respective tasks is stored using Redis key/values.
Requirements
The latest MRI (2.1, 2.0) version. Other versions/VMs are untested but might work fine. MRI 1.9 is not supported.
Redis 2.4 or greater is required.
One of the following background worker queue gems: resque
, sidekiq
or delayed_job
.
Note: resque
or sidekiq
is recommended since they use Redis as a backing store as well.
Installation
Add this line to your application's Gemfile:
gem 'taskinator'
And then execute:
$ bundle
Or install it yourself as:
$ gem install taskinator
Usage
Definition
Start by creating a "process" module and extending Taskinator::Definition
.
require 'taskinator'
module MyProcess
extend Taskinator::Definition
end
Define the process, using the define_process
method.
module MyProcess
extend Taskinator::Definition
# defines a process
define_process do
end
end
Specify the tasks with their corresponding implementation methods, that make up the process, using the task
method and providing
a name
and method
to execute for the task.
module MyProcess
extend Taskinator::Definition
define_process do
task :first_work_step
task :second_work_step
end
def first_work_step
# TODO: supply implementation
end
def second_work_step
# TODO: supply implementation
end
end
More complex processes may define sequential or concurrent steps, using the sequential
and concurrent
methods respectively.
module MyProcess
extend Taskinator::Definition
define_process do
concurrent do
# these tasks will be executed concurrently
task :work_step_1
task :work_step_2
end
sequential do
# thes tasks will be executed sequentially
task :work_step_3
task :work_step_4
end
end
def work_step_1
# TODO: supply implementation
end
...
def work_step_N
# TODO: supply implementation
end
end
You can define data driven tasks using the for_each
method, which takes an iterator method as an argument.
The iterator method yields the items to produce a parameterized task for that item. Notice that the task method
takes a parameter in this case, which will be the item provided by the iterator.
module MyProcess
extend Taskinator::Definition
define_process do
for_each :yield_data_elements do
task :work_step
end
end
def yield_data_elements
# TODO: supply implementation to yield elements
yield 1
end
def work_step(data_element)
# TODO: supply implementation
end
end
Processes can be composed of other processes too:
module MySubProcessA
...
end
module MySubProcessB
...
end
module MyProcess
extend Taskinator::Definition
define_process do
sub_process MySubProcessA
sub_process MySubProcessB
end
end
Any combination or nesting of task
, sequential
, concurrent
and for_each
steps are possible. E.g.
module MyProcess
extend Taskinator::Definition
define_process do
for_each :data_elements do
task :work_step_begin
concurrent do
for_each :sub_data_elements do
task :work_step_all_at_once
end
end
sub_process MySubProcess
sequential do
for_each :sub_data_elements do
task :work_step_one_by_one
end
end
task :work_step_end
end
end
# "task" and "iterator" methods omitted for brevity
end
In this example, the work_step_begin
is executed, followed by the work_step_all_at_once
steps which are executed concurrently, then
the sub process MySubProcess
is created and executed, followed by the work_step_one_by_one
tasks which are executed sequentially and
finally the work_step_end
is executed.
Execution
A process is executed by calling the generated create_process
method on your "process" module.
process = MyProcess.create_process
process.enqueue!
Arguments
TBD
Monitoring
To monitor the state of the processes, use the Taskinator::Api::Processes
class. This is still a work in progress.
processes = Taskinator::Api::Processes.new()
processes.each do |process|
# => output the unique process identifier and current state
puts [:process, process.uuid, process.current_state.name]
end
Configuration
Redis
By default Taskinator assumes Redis is located at localhost:6397
. This is fine for development, but for many production environments
you will need to poiint to an external Redis server. You may also what to use a namespace for the Redis keys.
NOTE: The configuration hash must have symbolized keys.
Taskinator.configure do |config|
config.redis = {
:url => 'redis://redis.example.com:7372/12',
:namespace => 'mynamespace'
}
end
Or, alternatively, via an ENV
variable
Set the REDIS_PROVIDER
environment variable to the Redis server url.
E.g. On Heroku, with RedisGreen: set REDIS_PROVIDER=REDISGREEN_URL and Taskinator will use the value of the REDISGREEN_URL
environment variable when connecting to Redis.
You may also use the generic REDIS_URL
which may be set to your own private Redis server.
The Redis configuration leverages the same setup as sidekiq
. For advanced options, checkout the
Sidekiq Advanced Options wiki for more information.
Queues
By default the queue names for process and task workers is default
, however, you can specify the queue names as follows:
Taskinator.configure do |config|
config.queue_config = {
:process_queue => :default,
:task_queue => :default
}
end
Notes
The persistence logic is decoupled from the implementation, so it is possible to implement another backing store if required.
Contributing
- Fork it
- Create your feature branch (
git checkout -b my-new-feature
) - Commit your changes (
git commit -am 'Add some feature'
) - Push to the branch (
git push origin my-new-feature
) - Create new Pull Request
License
MIT Copyright (c) 2014 Chris Stefano
Portions of code are from the Sidekiq project, Copyright (c) Contributed Systems LLC.
Inspiration
Inspired by the sidekiq and workflow gems.
For other workflow solutions, checkout Stonepath, the now deprecated ruote gem and workflow. Alternatively, for a robust enterprise ready solution checkout the AWS Flow Framework for Ruby.