Wolflow
Wolflow is a workflow engine which implements workflow patterns in ruby.
Installation
Add this line to your application's Gemfile:
gem 'wolflow'
And then execute:
$ bundle install
Or install it yourself as:
$ gem install wolflow
Usage
wolflow
exposes the workflow patterns building blocks as objects. As a user, you first build a specification of your workflow (the "workflow spec") as a series of linked "task specs"; then you execute and monitor its progress using a "workflow":
require "wolflow"
include Wolflow
class MathSpec < Simple
def on_complete(task)
task.workflow.data[:tasks] += 1
puts("do math exercises")
super
end
end
class ScienceSpec < Simple
def on_complete(task)
task.workflow.data[:tasks] += 1
puts("do science exercises")
super
end
end
# build homework specification
wspec = WorkflowSpec.new(id: "My Homework")
# first do math, then do science homework
wspec.connect(MathSpec.new(id: "math")).connect(ScienceSpec.new(id: "science"))
# preare workflow instance
wflow = Workflow.new(id: "homework", workflow_spec: wspec)
wflow.data[:tasks] = 0
# run a task each second
while wflow.complete_one
sleep(1)
end
# or run all tasks at once
wflow.complete_all
#=> "do math exercises"
#=> "do science exercises"
puts wflow.data
#≈> {:tasks=>2}
Workflow Specs, as well as Workflows, are serializable, which can be convenient for storage, or transfering representations across programs:
require "json"
# convert wspec to hash first, then dump to json
spec_dump = JSON.dump(Hash[wspec])
#=> => "{\"id\":\"My Homework\",\"task_specs\":[{\"id\":\"math\",\"name\":\"math_spec\",\"next_tasks\":[\"science\"]},{\"id\":\"science\",\"name\":\"science_spec\",\"next_tasks\":[]}]}"
loaded_spec = WorkflowSpec.from_hash(JSON.parse(spec_dump, symbolize_names: true))
#=> #<Wolflow::WorkflowSpec:0x000000012899e340 @id="My Homework", @....
wf_dump = JSON.dump(Hash[wflow])
#=> "{\"id\":\"homework\",\"workflow_spec\":{\"id\":\"My Homework\",\"task_specs\":[{\"id\":\"math\",\"name\":\"math_spec\",\"next_tasks\":[\"science\"]},{\"id\":\"science\",\"name\":\"science_spec\",...
loaded_wf = Workflow.from_hash(JSON.parse(wf_dump, symbolize_names: true))
Patterns
Here are a few snippets on how to apply the workflow patterns supported by wolflow
. For simplification purposes, assume that all snippets are preceded by:
require "wolflow"
include Wolflow
and can be loaded into a worlfow using:
wf = Workflow.new(id: test, workflow_spec: spec)
# and, if applicable:
wf.complete_all
Sequence
- http://www.workflowpatterns.com/patterns/control/basic/wcp1.php
- A task in a process in enabled after the completion of a preceding task in the same process.
spec = WorkflowSpec.new(id: "sequence")
spec.connect(Simple.new(id: "one")).connect(Simple.new(id: "two")) #=> returns the simple task with id "two"
Parallel Split
- http://www.workflowpatterns.com/patterns/control/basic/wcp2.php
- The divergence of a branch into two or more parallel branches each of which execute concurrently.
spec = WorkflowSpec.new(id: "parallel_split")
task_one = spec.connect(Simple.new(id: "one"))
task_one.connect(Simple.new(id: "parallel_1"), Simple.new(id: "parallel_2"), Simple.new(id: "parallel_2")) #=> returns parallel tasks
Synchronization
- http://www.workflowpatterns.com/patterns/control/basic/wcp3.php
- The convergence of two or more branches into a single subsequent branch such that the thread of control is passed to the subsequent branch when all input branches have been enabled.
spec = WorkflowSpec.new(id: "synchronization")
task_one = spec.connect(Simple.new(id: "one"))
parallel_tasks = task_one.connect(Simple.new(id: "parallel_1"), Simple.new(id: "parallel_2"), Simple.new(id: "parallel_2"))
sync_task = Synchronization.new(id: "sync_tasks")
sync_task.join(*parallel_tasks)
Exclusive Choice
- http://www.workflowpatterns.com/patterns/control/basic/wcp4.php
- The divergence of a branch into two or more branches such that when the incoming branch is enabled, the thread of control is immediately passed to precisely one of the outgoing branches based on a mechanism that can select one of the outgoing branches.
spec = WorkflowSpec.new(id: "exclusive_choice")
task_one = spec.connect(Simple.new(id: "one"))
if_spec = Simple.new(id: "if")
else_spec = Simple.new(id: "else")
choice_spec, _ = task_one.choose(else_spec, id: "choice")
# equivalent to:
# choice_spec = ExclusiveChoice.new(id: "choice", else_tasks: [else_spec])
choice_spec.connect(->(_task) { Time.now.hour < 13 }, if_spec)
Simple Merge
- http://www.workflowpatterns.com/patterns/control/basic/wcp5.php
- The convergence of two or more branches into a single subsequent branch such that each enablement of an incoming branch results in the thread of control being passed to the subsequent branch.
spec = WorkflowSpec.new(id: "simple_merge")
task_one = spec.connect(Simple.new(id: "one"))
parallel_tasks = task_one.connect(Simple.new(id: "parallel_1"), Simple.new(id: "parallel_2"), Simple.new(id: "parallel_2"))
merge_task = SimpleMerge.new(id: "merge_tasks")
merge_task.join(*parallel_tasks)
Multi Choice
- http://www.workflowpatterns.com/patterns/control/advanced_branching/wcp6.php
- The divergence of a branch into two or more branches such that when the incoming branch is enabled, the thread of control is immediately passed to one or more of the outgoing branches based on a mechanism that selects one or more outgoing branches.
spec = WorkflowSpec.new(id: "multi_choice")
task_one = spec.connect(Simple.new(id: "one"))
if_spec = Simple.new(id: "if")
if2_spec = Simple.new(id: "if2")
choice_spec = MultiChoice.new(id: "choice")
choice_spec.connect(->(_task) { Time.now.hour < 13 }, if_spec)
choice_spec.connect(->(_task) { Time.now.hour < 7 }, if2_spec)
Structured Synchronized Merge
- http://www.workflowpatterns.com/patterns/control/advanced_branching/wcp7.php
- The Structured Synchronizing Merge occurs in a structured context, i.e. there must be a single Multi-Choice construct earlier in the process model with which the Structured Synchronizing Merge is associated and it must merge all of the branches emanating from the Multi-Choice. These branches must either flow from the Structured Synchronizing Merge without any splits or joins or they must be structured in form (i.e. balanced splits and joins).
task_one = spec.connect(Simple.new(id: "structured_synchronized_merge"))
if_spec = Simple.new(id: "if")
if2_spec = Simple.new(id: "if2")
choice_spec = MultiChoice.new(id: "choice")
choice_spec.connect(->(_task) { Time.now.hour < 13 }, if_spec)
choice_spec.connect(->(_task) { Time.now.hour < 7 }, if2_spec)
merge_task = StructuredSynchronizingMerge.new(id: "merge_tasks")
merge_task.join(if_spec, if2_spec) #=> would raise error unless all arguments are branched from the same multi choice spec
Multi Merge
- http://www.workflowpatterns.com/patterns/control/advanced_branching/wcp8.php
- The convergence of two or more branches into a single subsequent branch such that each enablement of an incoming branch results in the thread of control being passed to the subsequent branch.
spec = WorkflowSpec.new(id: "multi_merge")
task_one = spec.connect(Simple.new(id: "one"))
parallel_tasks = task_one.connect(Simple.new(id: "parallel_1"), Simple.new(id: "parallel_2"), Simple.new(id: "parallel_2"))
merge_task = MultiMerge.new(id: "merge_tasks")
merge_task.join(*parallel_tasks)
Arbitrary Cycles
- http://www.workflowpatterns.com/patterns/control/structural/wcp10.php
- The ability to represent cycles in a process model that have more than one entry or exit point. It must be possible for individual entry and exit points to be associated with distinct branches.
spec = WorkflowSpec.new(id: "cycle")
task_one = spec.connect(Simple.new(id: "one"))
task_two = task_one.connect(Simple.new(id: "two"))
cycle = task_two.connect(Cycle.new(id: "cycle"))
cycle.connect(task_one)
Structured Loop
- http://www.workflowpatterns.com/patterns/control/new/wcp21.php
- The ability to execute a task or sub-process repeatedly. The loop has either a pre-test or post-test condition associated with it that is either evaluated at the beginning or end of the loop to determine whether it should continue.
spec = WorkflowSpec.new(id: "structured_loop")
task_one = spec.connect(Simple.new(id: "one"))
task_two = task_one.connect(Simple.new(id: "two"))
structured_loop = task_two.connect(StructuredLoop.new(id: "structured_loop"))
structured_loop.connect(->(_) { Time.now.hour < 13 }, task_one)
structured_loop.connect_else(Simple.new(id: "three"))
Recursion
- http://www.workflowpatterns.com/patterns/control/new/wcp22.php
- The ability of a task to invoke itself during its execution or an ancestor in terms of the overall decomposition structure with which it is associated.
spec = WorkflowSpec.new(id: "recursion")
task_one = spec.connect(Simple.new(id: "one"))
task_two = task_one.connect(Simple.new(id: "two"))
recursion = task_two.connect(Recursion.new(id: "recursion"))
recursion.connect(task_one)
Operators
In the examples above, ruby lambdas are used to showcase the selection mechanism for deciding whether a given conditional branch is followed:
# ...
choice_spec.connect(->(_task) { Time.now.hour < 13 }, if_spec)
This is not the only option though; the only requirement for these selection mechanism is that it responds to #call(?Task)
:
class IsBeforeLunch
def self.call(_task)
Time.now.hour < 13
end
end
choice_spec.connect(IsBeforeLunch, if_spec)
Both options wouldn't be enough though, if you need to serialize workflows (lambdas and singleton classes can't be marshalled into hashes).
wolflow
ships with operators, which are low-level hash-serializable primitives which can describe the type of expression one would most commonly expect in conditionals.
There are 3 basic operators: "literals" (plain values), "attributes" (variables pointing at data in the workflow), "operations" (applies a reducer function on a sequence of arguments) Here are a few examples:
require "wolflow"
include Wolflow
lit = Operators::Literal.new(value: 3) #=> 3, serializes to { type: "literal", value: 3 }"
lit2 = Operators::Literal.new(value: 4)
var = Operators::Attribute.new(name: "foo") #=> "foo", serializes to { type: "attribute", name: "foo" }
Operators::Operation.new(op: "+", members: [lit, lit2, var]) #=> "3 + 4 + foo", serializes to { type: "operation", op: "+", members: [{ type: "literal", name: "3" }, { type: "literal", name: "4" }, { type: "attribute", name: "foo" }]
Then there is "base", where others extend from, and which can also be used for extending the operator language.
# example of custom class calculation levenshtein distance using the gem:
require "levenshtein"
class LevenshteinDistance < Operators::Base
# (strings: Array[Operators::Base], threshold: Float, **untyped) -> void
def initialize(strings:, threshold: 0.8, **args)
@strings = strings
@threshold = threshold
super(**args)
end
# (Task task) -> Float
def call(task)
string1, string2 = @strings.map { |s| s.call(task) }
Levenshtein.normalized_distance(string1, string2, @threhsold)
end
def to_hash
hs = super
hs[:threshold] = @threshold.to_hash
hs[:strings] = @strings.map(&:to_hash)
hs
end
def self.from_hash(hash)
case hash
in { threshold: Hash => threshold, strings: [*, Hash, *] => strings, ** }
super(hash.merge(threshold: Operators.from_hash(threshold), strings: strings.map{ |s| Operators.from_hash(s) }))
else
super
end
end
end
op = Operators.from_hash({ type: "levenshtein_distance", threshold: { type: "literal", value: 0.8}, strings: [{ type: "literal", value: "test_string"}, {type: "attribute", name: "foo"}]})
#=>
#<LevenshteinDistance:0x000000012a796960 ...
op.call(Task.new(data: { foo: "bar" })) #=> calculates levenshtein distance between "test_string" and "bar"
Design Concepts
wolflow
is designed to provide workflow engine primitives as plain ruby objects, with a simple yet extensive API which allows defining, extending, and executing workflows. It's therefore defined by what it explicitly does not support.
- No custom business language to build workflows. It's impossible to provide such a language which fits all use cases. It's however possible to design your own language of top of it.
- Does not support standard workflow representations (like BPMN) out-of-the-box. However, it's possible to build support on top of it.
- No storage support. There are too many options to consider in the space. However, every object encodes to / decodes from ruby hashes, so p.ex. storing them in a database with JSON support should be fairly easy.
- No execution engine. You may want to distribute task execution to thread pools, separate processes, background jobs.
Related Links/Projects
- Workflow Patterns documentation
- flor, a ruby workflow engine with its own custom workflow definition "language", with built-in storage and execution mechanisms.
- Spiff Workflow, a python workflow engine allowing in-code workflow definitions, with BPMN and DMN support.
- Camunda, a workflow engine which is most commonly used "standalone", but can also be embedded in a Java (Spring Boot) application.
You can also find other interesting resources about workflow engines here
Development
After checking out the repo, run bin/setup
to install dependencies. Then, run rake test
to run the tests. You can also run bin/console
for an interactive prompt that will allow you to experiment.
To install this gem onto your local machine, run bundle exec rake install
. To release a new version, update the version number in version.rb
, and then run bundle exec rake release
, which will create a git tag for the version, push git commits and the created tag, and push the .gem
file to rubygems.org.
Contributing
Bug reports and pull requests are welcome on GitHub at https://gitlab.com/os85/wolflow.