Wolflow

Gem Version pipeline status coverage report

Wolflow is a workflow engine which implements workflow patterns in ruby.

Installation

Add this line to your application's Gemfile:

gem 'wolflow'

And then execute:

$ bundle install

Or install it yourself as:

$ gem install wolflow

Usage

wolflow exposes the workflow patterns building blocks as objects. As a user, you first build a specification of your workflow (the "workflow spec") as a series of linked "task specs"; then you execute and monitor its progress using a "workflow":

require "wolflow"

include Wolflow

class MathSpec < Simple
  def on_complete(task)
    task.workflow.data[:tasks] += 1
    puts("do math exercises")
    super
  end
end

class ScienceSpec < Simple
  def on_complete(task)
    task.workflow.data[:tasks] += 1
    puts("do science exercises")
    super
  end
end

# build homework specification
wspec = WorkflowSpec.new(id: "My Homework")

# first do math, then do science homework
wspec.connect(MathSpec.new(id: "math")).connect(ScienceSpec.new(id: "science"))

# preare workflow instance
wflow = Workflow.new(id: "homework", workflow_spec: wspec)
wflow.data[:tasks] = 0

# run a task each second
while wflow.complete_one
  sleep(1)
end

# or run all tasks at once
wflow.complete_all
#=> "do math exercises"
#=> "do science exercises"
puts wflow.data
#≈> {:tasks=>2}

Workflow Specs, as well as Workflows, are serializable, which can be convenient for storage, or transfering representations across programs:

require "json"

# convert wspec to hash first, then dump to json
spec_dump = JSON.dump(Hash[wspec])
#=> => "{\"id\":\"My Homework\",\"task_specs\":[{\"id\":\"math\",\"name\":\"math_spec\",\"next_tasks\":[\"science\"]},{\"id\":\"science\",\"name\":\"science_spec\",\"next_tasks\":[]}]}"
loaded_spec = WorkflowSpec.from_hash(JSON.parse(spec_dump, symbolize_names: true))
#=>  #<Wolflow::WorkflowSpec:0x000000012899e340 @id="My Homework", @....

wf_dump = JSON.dump(Hash[wflow])
#=> "{\"id\":\"homework\",\"workflow_spec\":{\"id\":\"My Homework\",\"task_specs\":[{\"id\":\"math\",\"name\":\"math_spec\",\"next_tasks\":[\"science\"]},{\"id\":\"science\",\"name\":\"science_spec\",...
loaded_wf = Workflow.from_hash(JSON.parse(wf_dump, symbolize_names: true))

Patterns

Here are a few snippets on how to apply the workflow patterns supported by wolflow. For simplification purposes, assume that all snippets are preceded by:

require "wolflow"

include Wolflow

and can be loaded into a worlfow using:

wf = Workflow.new(id: test, workflow_spec: spec)
# and, if applicable:
wf.complete_all

Sequence

spec = WorkflowSpec.new(id: "sequence")

spec.connect(Simple.new(id: "one")).connect(Simple.new(id: "two")) #=> returns the simple task with id "two"

Parallel Split

spec = WorkflowSpec.new(id: "parallel_split")

task_one = spec.connect(Simple.new(id: "one"))

task_one.connect(Simple.new(id: "parallel_1"), Simple.new(id: "parallel_2"), Simple.new(id: "parallel_2")) #=> returns parallel tasks

Synchronization

spec = WorkflowSpec.new(id: "synchronization")

task_one = spec.connect(Simple.new(id: "one"))

parallel_tasks = task_one.connect(Simple.new(id: "parallel_1"), Simple.new(id: "parallel_2"), Simple.new(id: "parallel_2"))

sync_task = Synchronization.new(id: "sync_tasks")

sync_task.join(*parallel_tasks)

Exclusive Choice

  • http://www.workflowpatterns.com/patterns/control/basic/wcp4.php
  • The divergence of a branch into two or more branches such that when the incoming branch is enabled, the thread of control is immediately passed to precisely one of the outgoing branches based on a mechanism that can select one of the outgoing branches.
spec = WorkflowSpec.new(id: "exclusive_choice")

task_one = spec.connect(Simple.new(id: "one"))
if_spec = Simple.new(id: "if")
else_spec = Simple.new(id: "else")

choice_spec, _ = task_one.choose(else_spec, id: "choice")
# equivalent to:
# choice_spec = ExclusiveChoice.new(id: "choice", else_tasks: [else_spec])

choice_spec.connect(->(_task) { Time.now.hour < 13 }, if_spec)

Simple Merge

spec = WorkflowSpec.new(id: "simple_merge")

task_one = spec.connect(Simple.new(id: "one"))

parallel_tasks = task_one.connect(Simple.new(id: "parallel_1"), Simple.new(id: "parallel_2"), Simple.new(id: "parallel_2"))

merge_task = SimpleMerge.new(id: "merge_tasks")

merge_task.join(*parallel_tasks)

Multi Choice

spec = WorkflowSpec.new(id: "multi_choice")

task_one = spec.connect(Simple.new(id: "one"))
if_spec = Simple.new(id: "if")
if2_spec = Simple.new(id: "if2")

choice_spec = MultiChoice.new(id: "choice")

choice_spec.connect(->(_task) { Time.now.hour < 13 }, if_spec)
choice_spec.connect(->(_task) { Time.now.hour < 7 }, if2_spec)

Structured Synchronized Merge

  • http://www.workflowpatterns.com/patterns/control/advanced_branching/wcp7.php
  • The Structured Synchronizing Merge occurs in a structured context, i.e. there must be a single Multi-Choice construct earlier in the process model with which the Structured Synchronizing Merge is associated and it must merge all of the branches emanating from the Multi-Choice. These branches must either flow from the Structured Synchronizing Merge without any splits or joins or they must be structured in form (i.e. balanced splits and joins).
task_one = spec.connect(Simple.new(id: "structured_synchronized_merge"))
if_spec = Simple.new(id: "if")
if2_spec = Simple.new(id: "if2")

choice_spec = MultiChoice.new(id: "choice")

choice_spec.connect(->(_task) { Time.now.hour < 13 }, if_spec)
choice_spec.connect(->(_task) { Time.now.hour < 7 }, if2_spec)

merge_task = StructuredSynchronizingMerge.new(id: "merge_tasks")

merge_task.join(if_spec, if2_spec) #=> would raise error unless all arguments are branched from the same multi choice spec

Multi Merge

spec = WorkflowSpec.new(id: "multi_merge")

task_one = spec.connect(Simple.new(id: "one"))

parallel_tasks = task_one.connect(Simple.new(id: "parallel_1"), Simple.new(id: "parallel_2"), Simple.new(id: "parallel_2"))

merge_task = MultiMerge.new(id: "merge_tasks")

merge_task.join(*parallel_tasks)

Arbitrary Cycles

spec = WorkflowSpec.new(id: "cycle")

task_one = spec.connect(Simple.new(id: "one"))
task_two = task_one.connect(Simple.new(id: "two"))
cycle = task_two.connect(Cycle.new(id: "cycle"))
cycle.connect(task_one)

Structured Loop

spec = WorkflowSpec.new(id: "structured_loop")

task_one = spec.connect(Simple.new(id: "one"))
task_two = task_one.connect(Simple.new(id: "two"))
structured_loop = task_two.connect(StructuredLoop.new(id: "structured_loop"))
structured_loop.connect(->(_) { Time.now.hour < 13 }, task_one)
structured_loop.connect_else(Simple.new(id: "three"))

Recursion

spec = WorkflowSpec.new(id: "recursion")

task_one = spec.connect(Simple.new(id: "one"))
task_two = task_one.connect(Simple.new(id: "two"))
recursion = task_two.connect(Recursion.new(id: "recursion"))
recursion.connect(task_one)

Operators

In the examples above, ruby lambdas are used to showcase the selection mechanism for deciding whether a given conditional branch is followed:

# ...
choice_spec.connect(->(_task) { Time.now.hour < 13 }, if_spec)

This is not the only option though; the only requirement for these selection mechanism is that it responds to #call(?Task):

class IsBeforeLunch
  def self.call(_task)
    Time.now.hour < 13
  end
end
choice_spec.connect(IsBeforeLunch, if_spec)

Both options wouldn't be enough though, if you need to serialize workflows (lambdas and singleton classes can't be marshalled into hashes).

wolflow ships with operators, which are low-level hash-serializable primitives which can describe the type of expression one would most commonly expect in conditionals.

There are 3 basic operators: "literals" (plain values), "attributes" (variables pointing at data in the workflow), "operations" (applies a reducer function on a sequence of arguments) Here are a few examples:

require "wolflow"

include Wolflow

lit = Operators::Literal.new(value: 3) #=> 3, serializes to { type: "literal", value: 3 }"

lit2 = Operators::Literal.new(value: 4)
var = Operators::Attribute.new(name: "foo") #=> "foo", serializes to { type: "attribute", name: "foo" }
Operators::Operation.new(op: "+", members: [lit, lit2, var]) #=> "3 + 4 + foo", serializes to { type: "operation", op: "+", members: [{ type: "literal", name: "3" }, { type: "literal", name: "4" }, { type: "attribute", name: "foo" }]

Then there is "base", where others extend from, and which can also be used for extending the operator language.

# example of custom class calculation levenshtein distance using the gem:

require "levenshtein"

class LevenshteinDistance < Operators::Base
  # (strings: Array[Operators::Base], threshold: Float, **untyped) -> void
  def initialize(strings:, threshold: 0.8, **args)
    @strings = strings
    @threshold = threshold
    super(**args)
  end

  # (Task task) -> Float
  def call(task)
    string1, string2 = @strings.map { |s| s.call(task) }

    Levenshtein.normalized_distance(string1, string2, @threhsold)
  end

  def to_hash
    hs = super
    hs[:threshold] = @threshold.to_hash
    hs[:strings] = @strings.map(&:to_hash)
    hs
  end

  def self.from_hash(hash)
    case hash
    in { threshold: Hash => threshold, strings: [*, Hash, *] => strings, ** }
      super(hash.merge(threshold: Operators.from_hash(threshold), strings: strings.map{ |s| Operators.from_hash(s) }))
    else
      super
    end
  end
end

op = Operators.from_hash({ type: "levenshtein_distance", threshold:  { type: "literal", value: 0.8}, strings: [{ type: "literal", value: "test_string"}, {type: "attribute", name: "foo"}]})
#=>
#<LevenshteinDistance:0x000000012a796960 ...

op.call(Task.new(data: { foo: "bar" })) #=> calculates levenshtein distance between "test_string" and "bar"

Design Concepts

wolflow is designed to provide workflow engine primitives as plain ruby objects, with a simple yet extensive API which allows defining, extending, and executing workflows. It's therefore defined by what it explicitly does not support.

  • No custom business language to build workflows. It's impossible to provide such a language which fits all use cases. It's however possible to design your own language of top of it.
  • Does not support standard workflow representations (like BPMN) out-of-the-box. However, it's possible to build support on top of it.
  • No storage support. There are too many options to consider in the space. However, every object encodes to / decodes from ruby hashes, so p.ex. storing them in a database with JSON support should be fairly easy.
  • No execution engine. You may want to distribute task execution to thread pools, separate processes, background jobs.
  • Workflow Patterns documentation
  • flor, a ruby workflow engine with its own custom workflow definition "language", with built-in storage and execution mechanisms.
  • Spiff Workflow, a python workflow engine allowing in-code workflow definitions, with BPMN and DMN support.
  • Camunda, a workflow engine which is most commonly used "standalone", but can also be embedded in a Java (Spring Boot) application.

You can also find other interesting resources about workflow engines here

Development

After checking out the repo, run bin/setup to install dependencies. Then, run rake test to run the tests. You can also run bin/console for an interactive prompt that will allow you to experiment.

To install this gem onto your local machine, run bundle exec rake install. To release a new version, update the version number in version.rb, and then run bundle exec rake release, which will create a git tag for the version, push git commits and the created tag, and push the .gem file to rubygems.org.

Contributing

Bug reports and pull requests are welcome on GitHub at https://gitlab.com/os85/wolflow.