Aggregates Icon

Aggregates

A ruby gem for writing CQRS applications with pluggable components.

Warning: This Gem is in active development and probably doesn't work correctly. Tests are really light.

Gem Version Ruby Style Guide

Table of Contents

Features

  • Pluggable Event / Command Storage Backends
  • Tools for Command Validation, Filtering, and Execution.
  • Opinioned structure for CQRS, Domain-Driven Design, and Event Sourcing.

Requirements

  1. Ruby 3.0+

Setup

To install, run:

gem install aggregates

Or Add the following to your Gemfile:

gem "aggregates"

Usage

Defining AggregateRoots

An AggregateRoot is a grouping of domain object(s) that work to encapsulate a single part of your Domain or Business Logic. The general design of aggregate roots should be as follows:

  • Create functions that encapsulate different operations on your Aggregate Roots. These functions should enforce buisiness logic constraints and then capture state changes by creating events.
  • Create event handlers that actually perform the state changes captured by those events.

A simple example is below:

class Post < Aggregates::AggregateRoot
  # Write functions that encapsulate business logic.
  def publish(command)
    apply EventPublished, body: command.body, category: command.category
  end

  # Modify the state of the aggregate from the emitted events.
  on EventPublished do |event|
    @body = event.body
    @category = event.category
  end
end

Note: the message-handling DSL (on) supports passing a super class of any given event as well. Every on block that applies to the event will be called in order from most specific to least specific.

Creating Commands

Commands are a type of domain message that define the shape and contract of data needed to perform an action. Essentially, they provide the api for interacting with your domain. Commands should have descriptive names capturing the change they are intended to make. For instance, ChangeUserEmail or AddComment.

class PublishPost < Aggregates::Command
  attribute :body, Types::String
  attribute :category, Types::String

  # Input Validation Handled via dry-validation.
  # Reference: https://dry-rb.org/gems/dry-validation/1.6/
  class Contract < Contract
    rule(:body) do
      key.failure('Post not long enough') unless value.length > 10
    end
  end
end

Creating Events

An Event describes something that happened. They are named in passed tense. For instance, if the user's email has changed, then you might create an event type called UserEmailChanged.

class PublishPost < Aggregates::Command
  attribute :body, Types::String
  attribute :category, Types::String
end

Processing Commands

The goal of a CommandProcessor is to route commands that have passed validation and filtering. They should invoke business logic on their respective aggregates. Doing so is accomplished by using the same message-handling DSL as in our AggregateRoots, this time for commands.

A helper function, with_aggregate, is provided to help retrieve the appropriate aggregate for a given command.

class PostCommandProcessor < Aggregates::CommandProcessor
  on PublishPost do |command|
    with_aggregate(Post, command) do |post|
      post.publsh(command)
    end
  end
end

Note: the message-handling DSL (on) supports passing a super class of any given event as well. Every on block that applies to the event will be called in order from most specific to least specific.

Filtering Commands

There are times where commands should not be executed by the domain logic. You can opt to include a condition in your command processor or aggregate. However, that is not always extensible if you have repeated logic between many commands. Additionally, it violates the single responsiblity principal.

Instead, it is best to support this kind of filtering logic using CommandFilters. A CommandFilter uses the same Message Handling message-handling DSL as the rest of the Aggregates gem. This time, it needs to return a true/false back to the gem to determine whether or not (true/false) the command should be allowed. Many command filters can provide many blocks of the on DSL. If any one of the filters rejects the command then the command will not be procesed.

class UpdatePostCommand < Aggregates::Command
  attribute :commanding_user_id, Types::String
end

class UpdatePostBody < UpdatePostCommand
  attribute :body, Types::String
end

class PostCommandFilter < Aggregates::CommandFilter
  on UpdatePostCommand do |command|
    with_aggregate(Post, command) do |post|
      post.owner_id == command.commanding_user_id
    end
  end
end

In this example, we are using a super class of UpdatePostBody. As with all MessageProcessors, calling on with a super class will be called when any child class is being processed. In other words, on UpdatePostCommand will be called when you call Aggregates.execute_command with an instance of UpdatePostBody.

Processing Events

Event processors are responsible for responding to events and effecting changes on things that are not the aggregates themselves. Here is where the read side of your CQRS model can take place. Since Aggregates does not enforce a storage solution for any component of the application, you will likely want to provide a helper mechanism for updating projections of aggregates into your read model.

Additionally, the EventProcessor type can be used to perform other side effects in other systems. Examples could include sending an email to welcome a user, publish the event to a webhook for a subscribing micro service, or much more.

class RssUpdateProcessor < Aggregates::EventProcessor
  def update_feed_for_new_post(event)
    # ...
  end

  on EventPublished do |event|
    update_feed_for_new_post(event)
  end
end

Note: the message-handling DSL (on) supports passing a super class of any given event as well. Every on block that applies to the event will be called in order from most specific to least specific.

Executing Commands

aggregate_id = Aggregates.new_aggregate_id
command = CreateThing.new(foo: 1, bar: false, aggregate_id: aggregate_id)
Aggregates.execute_command command

increment = IncrementFooThing.new(aggregate_id: aggregate_id)
toggle = ToggleBarThing.new(aggregate_id: aggregate_id)
Aggregates.execute_commands increment, toggle

Auditing Aggregates

aggregate_id = Aggregates.new_aggregate_id
# ... Commands and stuff happened.
auditor = Aggregates.audit MyAggregateType aggregate_id

# Each of these returns a list to investigate using.
events = auditor.events # Or events_processed_by(time) or events_processed_after(time)
commands = auditor.commands # Or commands_processed_by(time) or commands_processed_after(time)

# Or....
# View the state of an aggregate at a certain pont in time.
aggregate_at_time = auditor.inspect_state_at(Time.now - 1.hour)

Configuring

Storage Backends

Storage Backends at the method by which events and commands are stored in the system.

Aggregates.configure do |config|
  config.store_with MyAwesomeStorageBackend.new
end
Dynamoid

If Aggregates can require 'dynamoid' then it will provide the Aggregates::Dynamoid::DynamoidStorageBackend that stores using the Dynmoid Gem for AWS DynamoDB.

Adding Command Processors

Aggregates.configure do |config|
  # May call this method many times with different processors.
  config.process_commands_with PostCommandProcessor.new
end

Adding Event Processors

Aggregates.configure do |config|
  # May call this method many times with different processors.
  config.process_events_with RssUpdateProcessor.new
end

Adding Command Filters

Aggregates.configure do |config|
  config.filter_commands_with MyCommandFilter.new
end

Development

To contribute, run:

git clone https://github.com/resilient-vitality/aggregates.git
cd aggregates
bin/setup

You can also use the IRB console for direct access to all objects:

bin/console

Tests

To test, run:

bundle exec rake

Versioning

Read Semantic Versioning for details. Briefly, it means:

  • Major (X.y.z) - Incremented for any backwards incompatible public API changes.
  • Minor (x.Y.z) - Incremented for new, backwards compatible, public API enhancements/fixes.
  • Patch (x.y.Z) - Incremented for small, backwards compatible, bug fixes.

Code of Conduct

Please note that this project is released with a CODE OF CONDUCT. By participating in this project you agree to abide by its terms.

Contributions

Read CONTRIBUTING for details.

License

Copyright 2021 Resilient Vitality. Read LICENSE for details.

History

Read CHANGES for details. Built with Gemsmith.

Credits

Developed by Zach Probst at Resilient Vitality.