Module: Synapse::EventSourcing::AggregateRoot

Extended by:
ActiveSupport::Concern
Includes:
Domain::AggregateRoot, Member
Defined in:
lib/synapse/event_sourcing/aggregate_root.rb

Overview

Mixin for the root entity of an aggregate that is initialized from a historical event stream

Defined Under Namespace

Modules: ClassMethods

Instance Attribute Summary collapse

Attributes included from Domain::AggregateRoot

#deleted, #id

Instance Method Summary collapse

Methods included from Domain::AggregateRoot

#add_registration_listener, #mark_committed, #uncommitted_event_count, #uncommitted_events

Instance Attribute Details

#initial_versionInteger (readonly)

The sequence number of the first event that the aggregate was initialized from

If the aggregate was initialized from a snapshot, this should be reset to the sequence number of the last event in the snapshot. Otherwise, this will be the sequence number of the first event contained in the event stream used to initialize the aggregate.

Returns:

  • (Integer)


29
30
31
# File 'lib/synapse/event_sourcing/aggregate_root.rb', line 29

def initial_version
  @initial_version
end

Instance Method Details

#handle_member_event(payload, metadata = nil) ⇒ undefined

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Called when a member of the aggregate publishes an event

This is only meant to be invoked by entities that are members of this aggregate

Parameters:

  • payload (Object)
  • metadata (Hash) (defaults to: nil)

Returns:

  • (undefined)


75
76
77
# File 'lib/synapse/event_sourcing/aggregate_root.rb', line 75

def handle_member_event(payload,  = nil)
  apply payload, 
end

#initialize_from_stream(stream) ⇒ undefined

Initializes the state of this aggregate from the given domain event stream

Parameters:

  • stream (DomainEventStream)

Returns:

  • (undefined)

Raises:

  • (RuntimeError)

    If aggregate has already been initialized



47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
# File 'lib/synapse/event_sourcing/aggregate_root.rb', line 47

def initialize_from_stream(stream)
  if uncommitted_event_count > 0
    raise 'Aggregate has already been initialized'
  end

  # If this is loaded from a snapshot, don't pre-initialize
  pre_initialize unless @initial_version

  @initial_version = stream.peek.sequence_number

  last_sequence_number = nil

  stream.each do |event|
    last_sequence_number = event.sequence_number
    handle_recursively event
  end

  initialize_event_container last_sequence_number
end

#reset_initial_versionundefined

Resets the initial version to the current version of the aggregate

Returns:

  • (undefined)


38
39
40
# File 'lib/synapse/event_sourcing/aggregate_root.rb', line 38

def reset_initial_version
  @initial_version = last_committed_sequence_number
end

#versionInteger

Returns The sequence number of the last committed event.

Returns:

  • (Integer)

    The sequence number of the last committed event



32
33
34
# File 'lib/synapse/event_sourcing/aggregate_root.rb', line 32

def version
  last_committed_sequence_number
end