Wolflow is a workflow engine which implements workflow patterns in ruby.


Add this line to your application's Gemfile:

gem 'wolflow'

And then execute:

$ bundle install

Or install it yourself as:

$ gem install wolflow


wolflow exposes the workflow patterns building blocks as objects. As a user, you first build a specification of your workflow (the "workflow spec") as a series of linked "task specs"; then you execute and monitor its progress using a "workflow":

require "wolflow"

include Wolflow

class MathSpec < Simple
  def on_complete(task)
    task.workflow.data[:tasks] += 1
    puts("do math exercises")

class ScienceSpec < Simple
  def on_complete(task)
    task.workflow.data[:tasks] += 1
    puts("do science exercises")

# build homework specification
wspec = WorkflowSpec.new(id: "My Homework")

# first do math, then do science homework
wspec.connect(MathSpec.new(id: "math")).connect(ScienceSpec.new(id: "science"))

# preare workflow instance
wflow = Workflow.new(id: "homework", workflow_spec: wspec)
wflow.data[:tasks] = 0

# run a task each second
while wflow.complete_one

# or run all tasks at once
#=> "do math exercises"
#=> "do science exercises"
puts wflow.data
#≈> {:tasks=>2}

Workflow Specs, as well as Workflows, are serializable, which can be convenient for storage, or transfering representations across programs:

require "json"

# convert wspec to hash first, then dump to json
spec_dump = JSON.dump(Hash[wspec])
#=> => "{\"id\":\"My Homework\",\"task_specs\":[{\"id\":\"math\",\"name\":\"math_spec\",\"next_tasks\":[\"science\"]},{\"id\":\"science\",\"name\":\"science_spec\",\"next_tasks\":[]}]}"
loaded_spec = WorkflowSpec.from_hash(JSON.parse(spec_dump, symbolize_names: true))
#=>  #<Wolflow::WorkflowSpec:0x000000012899e340 @id="My Homework", @....

wf_dump = JSON.dump(Hash[wflow])
#=> "{\"id\":\"homework\",\"workflow_spec\":{\"id\":\"My Homework\",\"task_specs\":[{\"id\":\"math\",\"name\":\"math_spec\",\"next_tasks\":[\"science\"]},{\"id\":\"science\",\"name\":\"science_spec\",...
loaded_wf = Workflow.from_hash(JSON.parse(wf_dump, symbolize_names: true))


Here are a few snippets on how to apply the workflow patterns supported by wolflow. For simplification purposes, assume that all snippets are preceded by:

require "wolflow"

include Wolflow

and can be loaded into a worlfow using:

wf = Workflow.new(id: test, workflow_spec: spec)
# and, if applicable:


spec = WorkflowSpec.new(id: "sequence")

spec.connect(Simple.new(id: "one")).connect(Simple.new(id: "two")) #=> returns the simple task with id "two"

Parallel Split

spec = WorkflowSpec.new(id: "parallel_split")

task_one = spec.connect(Simple.new(id: "one"))

task_one.connect(Simple.new(id: "parallel_1"), Simple.new(id: "parallel_2"), Simple.new(id: "parallel_2")) #=> returns parallel tasks


spec = WorkflowSpec.new(id: "synchronization")

task_one = spec.connect(Simple.new(id: "one"))

parallel_tasks = task_one.connect(Simple.new(id: "parallel_1"), Simple.new(id: "parallel_2"), Simple.new(id: "parallel_2"))

sync_task = Synchronization.new(id: "sync_tasks")


Exclusive Choice

  • http://www.workflowpatterns.com/patterns/control/basic/wcp4.php
  • The divergence of a branch into two or more branches such that when the incoming branch is enabled, the thread of control is immediately passed to precisely one of the outgoing branches based on a mechanism that can select one of the outgoing branches.
spec = WorkflowSpec.new(id: "exclusive_choice")

task_one = spec.connect(Simple.new(id: "one"))
if_spec = Simple.new(id: "if")
else_spec = Simple.new(id: "else")

choice_spec, _ = task_one.choose(else_spec, id: "choice")
# equivalent to:
# choice_spec = ExclusiveChoice.new(id: "choice", else_tasks: [else_spec])

choice_spec.connect(->(_task) { Time.now.hour < 13 }, if_spec)

Simple Merge

spec = WorkflowSpec.new(id: "simple_merge")

task_one = spec.connect(Simple.new(id: "one"))

parallel_tasks = task_one.connect(Simple.new(id: "parallel_1"), Simple.new(id: "parallel_2"), Simple.new(id: "parallel_2"))

merge_task = SimpleMerge.new(id: "merge_tasks")


Multi Choice

spec = WorkflowSpec.new(id: "multi_choice")

task_one = spec.connect(Simple.new(id: "one"))
if_spec = Simple.new(id: "if")
if2_spec = Simple.new(id: "if2")

choice_spec = MultiChoice.new(id: "choice")

choice_spec.connect(->(_task) { Time.now.hour < 13 }, if_spec)
choice_spec.connect(->(_task) { Time.now.hour < 7 }, if2_spec)

Structured Synchronized Merge

  • http://www.workflowpatterns.com/patterns/control/advanced_branching/wcp7.php
  • The Structured Synchronizing Merge occurs in a structured context, i.e. there must be a single Multi-Choice construct earlier in the process model with which the Structured Synchronizing Merge is associated and it must merge all of the branches emanating from the Multi-Choice. These branches must either flow from the Structured Synchronizing Merge without any splits or joins or they must be structured in form (i.e. balanced splits and joins).
task_one = spec.connect(Simple.new(id: "structured_synchronized_merge"))
if_spec = Simple.new(id: "if")
if2_spec = Simple.new(id: "if2")

choice_spec = MultiChoice.new(id: "choice")

choice_spec.connect(->(_task) { Time.now.hour < 13 }, if_spec)
choice_spec.connect(->(_task) { Time.now.hour < 7 }, if2_spec)

merge_task = StructuredSynchronizingMerge.new(id: "merge_tasks")

merge_task.join(if_spec, if2_spec) #=> would raise error unless all arguments are branched from the same multi choice spec

Multi Merge

spec = WorkflowSpec.new(id: "multi_merge")

task_one = spec.connect(Simple.new(id: "one"))

parallel_tasks = task_one.connect(Simple.new(id: "parallel_1"), Simple.new(id: "parallel_2"), Simple.new(id: "parallel_2"))

merge_task = MultiMerge.new(id: "merge_tasks")


Arbitrary Cycles

spec = WorkflowSpec.new(id: "cycle")

task_one = spec.connect(Simple.new(id: "one"))
task_two = task_one.connect(Simple.new(id: "two"))
cycle = task_two.connect(Cycle.new(id: "cycle"))

Structured Loop

spec = WorkflowSpec.new(id: "structured_loop")

task_one = spec.connect(Simple.new(id: "one"))
task_two = task_one.connect(Simple.new(id: "two"))
structured_loop = task_two.connect(StructuredLoop.new(id: "structured_loop"))
structured_loop.connect(->(_) { Time.now.hour < 13 }, task_one)
structured_loop.connect_else(Simple.new(id: "three"))


spec = WorkflowSpec.new(id: "recursion")

task_one = spec.connect(Simple.new(id: "one"))
task_two = task_one.connect(Simple.new(id: "two"))
recursion = task_two.connect(Recursion.new(id: "recursion"))


In the examples above, ruby lambdas are used to showcase the selection mechanism for deciding whether a given conditional branch is followed:

# ...
choice_spec.connect(->(_task) { Time.now.hour < 13 }, if_spec)

This is not the only option though; the only requirement for these selection mechanism is that it responds to #call(?Task):

class IsBeforeLunch
  def self.call(_task)
    Time.now.hour < 13
choice_spec.connect(IsBeforeLunch, if_spec)

Both options wouldn't be enough though, if you need to serialize workflows (lambdas and singleton classes can't be marshalled into hashes).

wolflow ships with operators, which are low-level hash-serializable primitives which can describe the type of expression one would most commonly expect in conditionals.

There are 3 basic operators: "literals" (plain values), "attributes" (variables pointing at data in the workflow), "operations" (applies a reducer function on a sequence of arguments) Here are a few examples:

require "wolflow"

include Wolflow

lit = Operators::Literal.new(value: 3) #=> 3, serializes to { type: "literal", value: 3 }"

lit2 = Operators::Literal.new(value: 4)
var = Operators::Attribute.new(name: "foo") #=> "foo", serializes to { type: "attribute", name: "foo" }
Operators::Operation.new(op: "+", members: [lit, lit2, var]) #=> "3 + 4 + foo", serializes to { type: "operation", op: "+", members: [{ type: "literal", name: "3" }, { type: "literal", name: "4" }, { type: "attribute", name: "foo" }]

Then there is "base", where others extend from, and which can also be used for extending the operator language.

# example of custom class calculation levenshtein distance using the gem:

require "levenshtein"

class LevenshteinDistance < Operators::Base
  # (strings: Array[Operators::Base], threshold: Float, **untyped) -> void
  def initialize(strings:, threshold: 0.8, **args)
    @strings = strings
    @threshold = threshold

  # (Task task) -> Float
  def call(task)
    string1, string2 = @strings.map { |s| s.call(task) }

    Levenshtein.normalized_distance(string1, string2, @threhsold)

  def to_hash
    hs = super
    hs[:threshold] = @threshold.to_hash
    hs[:strings] = @strings.map(&:to_hash)

  def self.from_hash(hash)
    case hash
    in { threshold: Hash => threshold, strings: [*, Hash, *] => strings, ** }
      super(hash.merge(threshold: Operators.from_hash(threshold), strings: strings.map{ |s| Operators.from_hash(s) }))

op = Operators.from_hash({ type: "levenshtein_distance", threshold:  { type: "literal", value: 0.8}, strings: [{ type: "literal", value: "test_string"}, {type: "attribute", name: "foo"}]})
#<LevenshteinDistance:0x000000012a796960 ...

op.call(Task.new(data: { foo: "bar" })) #=> calculates levenshtein distance between "test_string" and "bar"

Design Concepts

wolflow is designed to provide workflow engine primitives as plain ruby objects, with a simple yet extensive API which allows defining, extending, and executing workflows. It's therefore defined by what it explicitly does not support.

  • No custom business language to build workflows. It's impossible to provide such a language which fits all use cases. It's however possible to design your own language of top of it.
  • Does not support standard workflow representations (like BPMN) out-of-the-box. However, it's possible to build support on top of it.
  • No storage support. There are too many options to consider in the space. However, every object encodes to / decodes from ruby hashes, so p.ex. storing them in a database with JSON support should be fairly easy.
  • No execution engine. You may want to distribute task execution to thread pools, separate processes, background jobs.
  • Workflow Patterns documentation
  • flor, a ruby workflow engine with its own custom workflow definition "language", with built-in storage and execution mechanisms.
  • Spiff Workflow, a python workflow engine allowing in-code workflow definitions, with BPMN and DMN support.
  • Camunda, a workflow engine which is most commonly used "standalone", but can also be embedded in a Java (Spring Boot) application.

You can also find other interesting resources about workflow engines here


After checking out the repo, run bin/setup to install dependencies. Then, run rake test to run the tests. You can also run bin/console for an interactive prompt that will allow you to experiment.

To install this gem onto your local machine, run bundle exec rake install. To release a new version, update the version number in version.rb, and then run bundle exec rake release, which will create a git tag for the version, push git commits and the created tag, and push the .gem file to rubygems.org.


Bug reports and pull requests are welcome on GitHub at https://gitlab.com/os85/wolflow.