Class: Sequent::Core::AggregateRepository

Inherits:
Object
  • Object
show all
Defined in:
lib/sequent/core/aggregate_repository.rb

Overview

Repository for aggregates.

Implements the Unit-Of-Work and Identity-Map patterns to ensure each aggregate is only loaded once per transaction and that you always get the same aggregate instance back.

On commit all aggregates associated with the Unit-Of-Work are queried for uncommitted events. After persisting these events the uncommitted events are cleared from the aggregate.

The repository keeps track of the Unit-Of-Work per thread, so can be shared between threads.

Defined Under Namespace

Classes: AggregateNotFound, HasUncommittedEvents, NonUniqueAggregateId

Constant Summary collapse

AGGREGATES_KEY =

Key used in thread local

'Sequent::Core::AggregateRepository::aggregates'.to_sym

Instance Method Summary collapse

Instance Method Details

#add_aggregate(aggregate) ⇒ Object

Adds the given aggregate to the repository (or unit of work).

Only when commit is called all aggregates in the unit of work are ‘processed’ and all uncammited_events are stored in the event_store



38
39
40
41
42
43
44
45
# File 'lib/sequent/core/aggregate_repository.rb', line 38

def add_aggregate(aggregate)
  existing = aggregates[aggregate.id]
  if existing && !existing.equal?(aggregate)
    raise NonUniqueAggregateId.new(aggregate, aggregates[aggregate.id])
  else
    aggregates[aggregate.id] = aggregate
  end
end

#clearObject

Clears the Unit of Work.



124
125
126
# File 'lib/sequent/core/aggregate_repository.rb', line 124

def clear
  Thread.current[AGGREGATES_KEY] = nil
end

#clear!Object

Clears the Unit of Work.

A HasUncommittedEvents is raised when there are uncommitted_events in the Unit of Work.



131
132
133
134
# File 'lib/sequent/core/aggregate_repository.rb', line 131

def clear!
  fail HasUncommittedEvents if aggregates.values.any? { |x| !x.uncommitted_events.empty? }
  clear
end

#commit(command) ⇒ Object

Gets all uncommitted_events from the ‘registered’ aggregates and stores them in the event store. The command is ‘attached’ for traceability purpose so we can see which command resulted in which events.

This is all abstracted away if you use the Sequent::Core::CommandService



113
114
115
116
117
118
119
120
121
# File 'lib/sequent/core/aggregate_repository.rb', line 113

def commit(command)
  updated_aggregates = aggregates.values.reject { |x| x.uncommitted_events.empty? }
  return if updated_aggregates.empty?
  streams_with_events = updated_aggregates.map do |aggregate|
    [ aggregate.event_stream, aggregate.uncommitted_events ]
  end
  updated_aggregates.each(&:clear_events)
  store_events command, streams_with_events
end

#contains_aggregate?(aggregate_id) ⇒ Boolean

Returns whether the event store has an aggregate with the given id

Returns:



102
103
104
# File 'lib/sequent/core/aggregate_repository.rb', line 102

def contains_aggregate?(aggregate_id)
  Sequent.configuration.event_store.stream_exists?(aggregate_id)
end

#ensure_exists(aggregate_id, clazz) ⇒ Object

Throws exception if not exists.



48
49
50
# File 'lib/sequent/core/aggregate_repository.rb', line 48

def ensure_exists(aggregate_id, clazz)
  !load_aggregate(aggregate_id, clazz).nil?
end

#load_aggregate(aggregate_id, clazz = nil) ⇒ Object

Loads aggregate by given id and class Returns the one in the current Unit Of Work otherwise loads it from history.



54
55
56
# File 'lib/sequent/core/aggregate_repository.rb', line 54

def load_aggregate(aggregate_id, clazz = nil)
  load_aggregates([aggregate_id], clazz)[0]
end

#load_aggregates(aggregate_ids, clazz = nil) ⇒ Object

Loads multiple aggregates at once. Returns the ones in the current Unit Of Work otherwise loads it from history.

Note: This will load all the aggregates in memory, so querying 100s of aggregates with 100s of events could cause memory issues.

Returns all aggregates or raises AggregateNotFound If clazz is given and one of the aggregates is not of the correct type a TypeError is raised.

aggregate_ids The ids of the aggregates to be loaded clazz Optional argument that checks if all aggregates are of type clazz



71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
# File 'lib/sequent/core/aggregate_repository.rb', line 71

def load_aggregates(aggregate_ids, clazz = nil)
  fail ArgumentError.new('aggregate_ids is required') unless aggregate_ids
  return [] if aggregate_ids.empty?

  _aggregate_ids = aggregate_ids.uniq
  _aggregates = aggregates.values_at(*_aggregate_ids).compact
  _query_ids = _aggregate_ids - _aggregates.map(&:id)

  _aggregates += Sequent.configuration.event_store.load_events_for_aggregates(_query_ids).map do |stream, events|
    aggregate_class = Class.const_get(stream.aggregate_type)
    aggregate_class.load_from_history(stream, events)
  end

  if _aggregates.count != _aggregate_ids.count
    missing_aggregate_ids = _aggregate_ids - _aggregates.map(&:id)
    raise AggregateNotFound.new(missing_aggregate_ids)
  end

  if clazz
    _aggregates.each do |aggregate|
      raise TypeError, "#{aggregate.class} is not a #{clazz}" if !(aggregate.class <= clazz)
    end
  end

  _aggregates.map do |aggregate|
    aggregates[aggregate.id] = aggregate
  end
end