Class: Sequent::Core::AggregateRepository

Inherits:
Object
  • Object
show all
Defined in:
lib/sequent/core/aggregate_repository.rb

Overview

Repository for aggregates.

Implements the Unit-Of-Work and Identity-Map patterns to ensure each aggregate is only loaded once per transaction and that you always get the same aggregate instance back.

On commit all aggregates associated with the Unit-Of-Work are queried for uncommitted events. After persisting these events the uncommitted events are cleared from the aggregate.

The repository keeps track of the Unit-Of-Work per thread, so can be shared between threads.

Defined Under Namespace

Classes: AggregateNotFound, HasUncommittedEvents, NonUniqueAggregateId

Constant Summary collapse

AGGREGATES_KEY =

Key used in thread local

'Sequent::Core::AggregateRepository::aggregates'.to_sym

Instance Method Summary collapse

Instance Method Details

#add_aggregate(aggregate) ⇒ Object

Adds the given aggregate to the repository (or unit of work).

Only when commit is called all aggregates in the unit of work are ‘processed’ and all uncammited_events are stored in the event_store



40
41
42
43
44
45
46
47
# File 'lib/sequent/core/aggregate_repository.rb', line 40

def add_aggregate(aggregate)
  existing = aggregates[aggregate.id]
  if existing && !existing.equal?(aggregate)
    fail NonUniqueAggregateId.new(aggregate, aggregates[aggregate.id])
  else
    aggregates[aggregate.id] = aggregate
  end
end

#clearObject

Clears the Unit of Work.



164
165
166
# File 'lib/sequent/core/aggregate_repository.rb', line 164

def clear
  Thread.current[AGGREGATES_KEY] = nil
end

#clear!Object

Clears the Unit of Work.

A HasUncommittedEvents is raised when there are uncommitted_events in the Unit of Work.



171
172
173
174
175
# File 'lib/sequent/core/aggregate_repository.rb', line 171

def clear!
  fail HasUncommittedEvents if aggregates.values.any? { |x| !x.uncommitted_events.empty? }

  clear
end

#commit(command) ⇒ Object

Gets all uncommitted_events from the ‘registered’ aggregates and stores them in the event store.

The events given to the EventStore are ordered in loading order of the different AggregateRoot’s. So Events are stored (and therefore published) in order in which they are ‘apply`-ed per AggregateRoot.

The command is ‘attached’ for traceability purpose so we can see which command resulted in which events.

This is all abstracted away if you use the Sequent::Core::CommandService



152
153
154
155
156
157
158
159
160
161
# File 'lib/sequent/core/aggregate_repository.rb', line 152

def commit(command)
  updated_aggregates = aggregates.values.reject { |x| x.uncommitted_events.empty? }
  return if updated_aggregates.empty?

  streams_with_events = updated_aggregates.map do |aggregate|
    [aggregate.event_stream, aggregate.uncommitted_events]
  end
  updated_aggregates.each(&:clear_events)
  store_events command, streams_with_events
end

#contains_aggregate?(aggregate_id) ⇒ Boolean

Returns whether the event store has an aggregate with the given id

Returns:



135
136
137
138
# File 'lib/sequent/core/aggregate_repository.rb', line 135

def contains_aggregate?(aggregate_id)
  Sequent.configuration.event_store.stream_exists?(aggregate_id) &&
    Sequent.configuration.event_store.events_exists?(aggregate_id)
end

#ensure_exists(aggregate_id, clazz) ⇒ Object

Throws exception if not exists.



50
51
52
# File 'lib/sequent/core/aggregate_repository.rb', line 50

def ensure_exists(aggregate_id, clazz)
  !load_aggregate(aggregate_id, clazz).nil?
end

#load_aggregate(aggregate_id, clazz = nil) ⇒ Object

Loads aggregate by given id and class Returns the one in the current Unit Of Work otherwise loads it from history.



56
57
58
# File 'lib/sequent/core/aggregate_repository.rb', line 56

def load_aggregate(aggregate_id, clazz = nil)
  load_aggregates([aggregate_id], clazz)[0]
end

#load_aggregate_for_snapshotting(aggregate_id, clazz = nil, load_until: nil) ⇒ Object

Optimised for loading lots of events and ignore snapshot events. To get the correct historical state of an AggregateRoot it is necessary to be able to ignore snapshots. For a nested AggregateRoot, there will not be a sequence number known, so a load_until timestamp can be used instead.

aggregate_id The id of the aggregate to be loaded

clazz Optional argument that checks if aggregate is of type clazz

load_until Optional argument that defines up until what point in time the AggregateRoot will be rebuilt.



69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
# File 'lib/sequent/core/aggregate_repository.rb', line 69

def load_aggregate_for_snapshotting(aggregate_id, clazz = nil, load_until: nil)
  fail ArgumentError, 'aggregate_id is required' if aggregate_id.blank?

  stream = Sequent
    .configuration
    .event_store
    .find_event_stream(aggregate_id)
  aggregate = Class.const_get(stream.aggregate_type).stream_from_history(stream)

  Sequent
    .configuration
    .event_store
    .stream_events_for_aggregate(aggregate_id, load_until: load_until) do |event_stream|
      aggregate.stream_from_history(event_stream)
    end

  if clazz
    fail TypeError, "#{aggregate.class} is not a #{clazz}" unless aggregate.class <= clazz
  end
  aggregate
end

#load_aggregates(aggregate_ids, clazz = nil) ⇒ Object

Loads multiple aggregates at once. Returns the ones in the current Unit Of Work otherwise loads it from history.

Note: This will load all the aggregates in memory, so querying 100s of aggregates with 100s of events could cause memory issues.

Returns all aggregates or raises AggregateNotFound If clazz is given and one of the aggregates is not of the correct type a TypeError is raised.

aggregate_ids The ids of the aggregates to be loaded clazz Optional argument that checks if all aggregates are of type clazz



104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
# File 'lib/sequent/core/aggregate_repository.rb', line 104

def load_aggregates(aggregate_ids, clazz = nil)
  fail ArgumentError, 'aggregate_ids is required' unless aggregate_ids
  return [] if aggregate_ids.empty?

  unique_ids = aggregate_ids.uniq
  result = aggregates.values_at(*unique_ids).compact
  query_ids = unique_ids - result.map(&:id)

  result += Sequent.configuration.event_store.load_events_for_aggregates(query_ids).map do |stream, events|
    aggregate_class = Class.const_get(stream.aggregate_type)
    aggregate_class.load_from_history(stream, events)
  end

  if result.count != unique_ids.count
    missing_aggregate_ids = unique_ids - result.map(&:id)
    fail AggregateNotFound, missing_aggregate_ids
  end

  if clazz
    result.each do |aggregate|
      fail TypeError, "#{aggregate.class} is not a #{clazz}" unless aggregate.class <= clazz
    end
  end

  result.map do |aggregate|
    aggregates[aggregate.id] = aggregate
  end
end