Module: Synapse::EventSourcing::AggregateRoot

Extended by:
ActiveSupport::Concern
Includes:
Domain::AggregateRoot, Member
Defined in:
lib/synapse/event_sourcing/aggregate_root.rb

Overview

Mixin for the root entity of an aggregate that is initialized from a historical event stream

Handling events

For ease of use, the event mapping DSL is included in event-sourced aggregates. This includes both the aggregate root and any entities that are a member of the aggregate.

This mapping DSL can be used to map handlers for events that have been applied to the aggregate

class Order
  include Synapse::EventSourcing::AggregateRoot

  def add_item(sku, quantity, unit_price)
    # Do business logic validation here
    apply(new ItemsAddedToOrder(id, sku, quantity, unit_price))
  end

  map_event ItemsAddedToOrder do |event|
    @value = @value + (event.quantity * event.unit_price)
    @line_items.add(OrderLineItem.new(sku, quantity, unit_price))
  end
end

Events applied to the aggregate root are cascaded down to any child entities of the aggregate root. Events applied to a child entity are actually applied to the aggregate root’s event container, and are then cascaded down to the child entities.

Initialization

An aggregate can be initialized in three ways:

  • Created by the caller using new, usually to create a new aggregate

  • Allocated, then initialized from an event stream

  • Deserialized from a snapshot

Because of the different ways the aggregate can be created, it is necessary to separate out the logic needed to create data structures required by the aggregate to function.

Ruby does not support method overloading, so there is a hook provided to do any logic necessary to instantiate the aggregate, such as creating collections.

class Order
  include Synapse::EventSourcing::AggregateRoot

  def initialize(id, max_value)
    pre_initialize
    apply(OrderCreated.new(id, max_value))
  end

protected

  def pre_initialize
    @line_items = Set.new
  end
end

Instead of using the pre-initialization hook, you can also do lazy initialization.

class Order
  include Synapse::EventSourcing::AggregateRoot

  def line_items
    @line_items ||= Set.new
  end
end

Snapshots

When the event stream for an aggregate becomes large, it can put a drain on the application to have to load the entire stream of an aggregate just to perform a single operation upon it.

To solve this issue, Synapse supports snapshotting aggregates. The built-in way to snapshot is simply to serialize the aggregate root, since it contains entire state of the aggregate.

When using a marshalling serializer, like the built-in Ruby marshaller or ones like Oj and Ox, no work needs to be done to prepare the aggregate for serialization. However, when using the attribute serializer, you have to treat snapshots as mementos.

class Order
  include Synapse::EventSourcing::AggregateRoot

  def attributes
    line_items = @line_items.map { |li| li.attributes }
    { id: @id, value: @value }
  end

  # Note that this is called after #allocate
  def attributes=(attributes)
    @id = attributes[:id]
    @value = attributes[:value]
    # Yeah, it's pretty ugly to support this
    @line_items = attributes[:line_items].map do |line_item|
      OrderLineItem.allocate.tap { |li| li.attributes = line_item }
    end
  end
end

It would be nice to have something like XStream to make serialization easier, but as far as I can tell, there’s nothing even close to it for Ruby.

Defined Under Namespace

Modules: ClassMethods

Instance Attribute Summary collapse

Attributes included from Domain::AggregateRoot

#deleted, #id

Instance Method Summary collapse

Methods included from Domain::AggregateRoot

#add_registration_listener, #mark_committed, #uncommitted_event_count, #uncommitted_events

Instance Attribute Details

#initial_versionInteger (readonly)

The sequence number of the first event that the aggregate was initialized from

If the aggregate was initialized from a snapshot, this should be reset to the sequence number of the last event in the snapshot. Otherwise, this will be the sequence number of the first event contained in the event stream used to initialize the aggregate.

Returns:

  • (Integer)


129
130
131
# File 'lib/synapse/event_sourcing/aggregate_root.rb', line 129

def initial_version
  @initial_version
end

Instance Method Details

#handle_member_event(payload, metadata = nil) ⇒ undefined

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Called when a member of the aggregate publishes an event

This is only meant to be invoked by entities that are members of this aggregate

Parameters:

  • payload (Object)
  • metadata (Hash) (defaults to: nil)

Returns:

  • (undefined)


175
176
177
# File 'lib/synapse/event_sourcing/aggregate_root.rb', line 175

def handle_member_event(payload,  = nil)
  apply payload, 
end

#initialize_from_stream(stream) ⇒ undefined

Initializes the state of this aggregate from the given domain event stream

Parameters:

  • stream (DomainEventStream)

Returns:

  • (undefined)

Raises:

  • (RuntimeError)

    If aggregate has already been initialized



147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
# File 'lib/synapse/event_sourcing/aggregate_root.rb', line 147

def initialize_from_stream(stream)
  if uncommitted_event_count > 0
    raise 'Aggregate has already been initialized'
  end

  # If this is loaded from a snapshot, don't pre-initialize
  pre_initialize unless @initial_version

  @initial_version = stream.peek.sequence_number

  last_sequence_number = nil

  stream.each do |event|
    last_sequence_number = event.sequence_number
    handle_recursively event
  end

  initialize_event_container last_sequence_number
end

#reset_initial_versionundefined

Resets the initial version to the current version of the aggregate

Returns:

  • (undefined)


138
139
140
# File 'lib/synapse/event_sourcing/aggregate_root.rb', line 138

def reset_initial_version
  @initial_version = last_committed_sequence_number
end

#versionInteger

Returns The sequence number of the last committed event.

Returns:

  • (Integer)

    The sequence number of the last committed event



132
133
134
# File 'lib/synapse/event_sourcing/aggregate_root.rb', line 132

def version
  last_committed_sequence_number
end