Aggregates
A ruby gem for writing CQRS applications with pluggable components.
Warning: This Gem is in active development and probably doesn't work correctly. Tests are really light.
Table of Contents
- Features
- Requirements
- Setup
- Usage
- Development
- Tests
- Versioning
- Code of Conduct
- Contributions
- License
- History
- Credits
Features
- Pluggable Event / Command Storage Backends
- Tools for Command Validation, Filtering, and Execution.
- Opinioned structure for CQRS, Domain-Driven Design, and Event Sourcing.
Requirements
Setup
To install, run:
gem install aggregates
Or Add the following to your Gemfile:
gem "aggregates"
Usage
Defining AggregateRoots
An AggregateRoot is a grouping of domain object(s) that work to encapsulate a single part of your Domain or Business Logic. The general design of aggregate roots should be as follows:
- Create functions that encapsulate different operations on your Aggregate Roots. These functions should enforce buisiness logic constraints and then capture state changes by creating events.
- Create event handlers that actually perform the state changes captured by those events.
A simple example is below:
class Post < Aggregates::AggregateRoot
# Write functions that encapsulate business logic.
def publish(command)
apply EventPublished, body: command.body, category: command.category
end
# Modify the state of the aggregate from the emitted events.
on EventPublished do |event|
@body = event.body
@category = event.category
end
end
Note: the message-handling DSL (on) supports passing a super class of any given event
as well. Every on block that applies to the event will be called in order from most specific to least specific.
Creating Commands
Commands are a type of domain message that define the shape and contract of data needed to perform an action. Essentially, they provide the api for interacting with your domain. Commands should have descriptive names capturing the change they are intended to make. For instance, ChangeUserEmail or AddComment.
class PublishPost < Aggregates::Command
attribute :body, Types::String
attribute :category, Types::String
# Input Validation Handled via dry-validation.
# Reference: https://dry-rb.org/gems/dry-validation/1.6/
class Contract < Contract
rule(:body) do
key.failure('Post not long enough') unless value.length > 10
end
end
end
Creating Events
An Event describes something that happened. They are named in passed tense.
For instance, if the user's email has changed, then you might create an event type called
UserEmailChanged.
class PublishPost < Aggregates::Command
attribute :body, Types::String
attribute :category, Types::String
end
Processing Commands
The goal of a CommandProcessor is to route commands that have passed validation and
filtering. They should invoke business logic on their respective aggregates. Doing so is accomplished by using the same message-handling DSL as in our AggregateRoots, this time for commands.
A helper function, with_aggregate, is provided to help retrieve the appropriate aggregate
for a given command.
class PostCommandProcessor < Aggregates::CommandProcessor
on PublishPost do |command|
with_aggregate(Post, command) do |post|
post.publsh(command)
end
end
end
Note: the message-handling DSL (on) supports passing a super class of any given event
as well. Every on block that applies to the event will be called in order from most specific to least specific.
Filtering Commands
There are times where commands should not be executed by the domain logic. You can opt to include a condition in your command processor or aggregate. However, that is not always extensible if you have repeated logic between many commands. Additionally, it violates the single responsiblity principal.
Instead, it is best to support this kind of filtering logic using CommandFilters. A CommandFilter uses the same Message Handling message-handling DSL as the rest of the Aggregates gem. This time, it needs to return a true/false back to the gem to determine whether or not (true/false) the command should be allowed. Many command filters can provide many blocks of the on DSL. If any one of the filters rejects the command then the command will not be procesed.
class UpdatePostCommand < Aggregates::Command
attribute :commanding_user_id, Types::String
end
class UpdatePostBody < UpdatePostCommand
attribute :body, Types::String
end
class PostCommandFilter < Aggregates::CommandFilter
on UpdatePostCommand do |command|
with_aggregate(Post, command) do |post|
post.owner_id == command.commanding_user_id
end
end
end
In this example, we are using a super class of UpdatePostBody.
As with all MessageProcessors, calling on with a super class
will be called when any child class is being processed. In other words,
on UpdatePostCommand will be called when you call Aggregates.execute_command
with an instance of UpdatePostBody.
Processing Events
Event processors are responsible for responding to events and effecting changes on things
that are not the aggregates themselves. Here is where the read side of your CQRS model can take
place. Since Aggregates does not enforce a storage solution for any component of the application, you will likely want to provide a helper mechanism for updating projections of aggregates into your read model.
Additionally, the EventProcessor type can be used to perform other side effects in other systems. Examples could include sending an email to welcome a user, publish the event to a webhook for a subscribing micro service, or much more.
class RssUpdateProcessor < Aggregates::EventProcessor
def update_feed_for_new_post(event)
# ...
end
on EventPublished do |event|
update_feed_for_new_post(event)
end
end
Note: the message-handling DSL (on) supports passing a super class of any given event
as well. Every on block that applies to the event will be called in order from most specific to least specific.
Executing Commands
aggregate_id = Aggregates.new_aggregate_id
command = CreateThing.new(foo: 1, bar: false, aggregate_id: aggregate_id)
Aggregates.execute_command command
increment = IncrementFooThing.new(aggregate_id: aggregate_id)
toggle = ToggleBarThing.new(aggregate_id: aggregate_id)
Aggregates.execute_commands increment, toggle
Auditing Aggregates
aggregate_id = Aggregates.new_aggregate_id
# ... Commands and stuff happened.
auditor = Aggregates.audit MyAggregateType aggregate_id
# Each of these returns a list to investigate using.
events = auditor.events # Or events_processed_by(time) or events_processed_after(time)
commands = auditor.commands # Or commands_processed_by(time) or commands_processed_after(time)
# Or....
# View the state of an aggregate at a certain pont in time.
aggregate_at_time = auditor.inspect_state_at(Time.now - 1.hour)
Configuring
Storage Backends
Storage Backends at the method by which events and commands are stored in the system.
Aggregates.configure do |config|
config.store_with MyAwesomeStorageBackend.new
end
Dynamoid
If Aggregates can require 'dynamoid' then it will provide the Aggregates::Dynamoid::DynamoidStorageBackend that
stores using the Dynmoid Gem for AWS DynamoDB.
Adding Command Processors
Aggregates.configure do |config|
# May call this method many times with different processors.
config.process_commands_with PostCommandProcessor.new
end
Adding Event Processors
Aggregates.configure do |config|
# May call this method many times with different processors.
config.process_events_with RssUpdateProcessor.new
end
Adding Command Filters
Aggregates.configure do |config|
config.filter_commands_with MyCommandFilter.new
end
Development
To contribute, run:
git clone https://github.com/resilient-vitality/aggregates.git
cd aggregates
bin/setup
You can also use the IRB console for direct access to all objects:
bin/console
Tests
To test, run:
bundle exec rake
Versioning
Read Semantic Versioning for details. Briefly, it means:
- Major (X.y.z) - Incremented for any backwards incompatible public API changes.
- Minor (x.Y.z) - Incremented for new, backwards compatible, public API enhancements/fixes.
- Patch (x.y.Z) - Incremented for small, backwards compatible, bug fixes.
Code of Conduct
Please note that this project is released with a CODE OF CONDUCT. By participating in this project you agree to abide by its terms.
Contributions
Read CONTRIBUTING for details.
License
Copyright 2021 Resilient Vitality. Read LICENSE for details.
History
Read CHANGES for details. Built with Gemsmith.
Credits
Developed by Zach Probst at Resilient Vitality.