Class: Sequent::Core::AggregateRoot

Inherits:
Object
  • Object
show all
Includes:
Helpers::AutosetAttributes, Helpers::MessageHandler, SnapshotConfiguration
Defined in:
lib/sequent/core/aggregate_root.rb

Overview

Base class for all your domain classes.

load_from_history functionality to be loaded_from_history, meaning a stream of events.

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Methods included from SnapshotConfiguration

included

Methods included from Helpers::AutosetAttributes

included

Methods included from Helpers::MessageHandler

#handle_message, included

Constructor Details

#initialize(id) ⇒ AggregateRoot

Returns a new instance of AggregateRoot.



64
65
66
67
68
69
70
71
# File 'lib/sequent/core/aggregate_root.rb', line 64

def initialize(id)
  @id = id
  @uncommitted_events = []
  @sequence_number = 1
  @event_stream = EventStream.new aggregate_type: self.class.name,
                                  aggregate_id: id,
                                  snapshot_threshold: self.class.snapshot_default_threshold
end

Instance Attribute Details

#event_streamObject (readonly)

Returns the value of attribute event_stream.



43
44
45
# File 'lib/sequent/core/aggregate_root.rb', line 43

def event_stream
  @event_stream
end

#idObject (readonly)

Returns the value of attribute id.



43
44
45
# File 'lib/sequent/core/aggregate_root.rb', line 43

def id
  @id
end

#sequence_numberObject (readonly)

Returns the value of attribute sequence_number.



43
44
45
# File 'lib/sequent/core/aggregate_root.rb', line 43

def sequence_number
  @sequence_number
end

#uncommitted_eventsObject (readonly)

Returns the value of attribute uncommitted_events.



43
44
45
# File 'lib/sequent/core/aggregate_root.rb', line 43

def uncommitted_events
  @uncommitted_events
end

Class Method Details

.inherited(subclass) ⇒ Object



45
46
47
48
# File 'lib/sequent/core/aggregate_root.rb', line 45

def self.inherited(subclass)
  super
  AggregateRoots << subclass
end

.load_from_history(stream, events) ⇒ Object



50
51
52
53
54
55
56
57
58
59
60
61
62
# File 'lib/sequent/core/aggregate_root.rb', line 50

def self.load_from_history(stream, events)
  first, *rest = events
  if first.is_a? SnapshotEvent
    # rubocop:disable Security/MarshalLoad
    aggregate_root = Marshal.load(Base64.decode64(first.data))
    # rubocop:enable Security/MarshalLoad
    rest.each { |x| aggregate_root.apply_event(x) }
  else
    aggregate_root = allocate # allocate without calling new
    aggregate_root.load_from_history(stream, events)
  end
  aggregate_root
end

.stream_from_history(stream) ⇒ Object



97
98
99
100
101
# File 'lib/sequent/core/aggregate_root.rb', line 97

def self.stream_from_history(stream)
  aggregate_root = allocate
  aggregate_root.initialize_for_streaming(stream)
  aggregate_root
end

Instance Method Details

#apply_event(event) ⇒ Object



116
117
118
119
# File 'lib/sequent/core/aggregate_root.rb', line 116

def apply_event(event)
  handle_message(event)
  @sequence_number = event.sequence_number + 1
end

#clear_eventsObject



107
108
109
# File 'lib/sequent/core/aggregate_root.rb', line 107

def clear_events
  @uncommitted_events = []
end

#initialize_for_streaming(stream) ⇒ Object



83
84
85
86
87
# File 'lib/sequent/core/aggregate_root.rb', line 83

def initialize_for_streaming(stream)
  @uncommitted_events = []
  @sequence_number = 1
  @event_stream = stream
end

#load_from_history(stream, events) ⇒ Object



73
74
75
76
77
78
79
80
81
# File 'lib/sequent/core/aggregate_root.rb', line 73

def load_from_history(stream, events)
  fail 'Empty history' if events.empty?

  @id = events.first.aggregate_id
  @uncommitted_events = []
  @sequence_number = 1
  @event_stream = stream
  events.each { |event| apply_event(event) }
end

#stream_from_history(stream_events) ⇒ Object



89
90
91
92
93
94
95
# File 'lib/sequent/core/aggregate_root.rb', line 89

def stream_from_history(stream_events)
  _stream, event = stream_events
  fail 'Empty history' if event.blank?

  @id ||= event.aggregate_id
  apply_event(event)
end

#take_snapshot!Object



111
112
113
114
# File 'lib/sequent/core/aggregate_root.rb', line 111

def take_snapshot!
  snapshot = build_event SnapshotEvent, data: Base64.encode64(Marshal.dump(self))
  @uncommitted_events << snapshot
end

#to_sObject



103
104
105
# File 'lib/sequent/core/aggregate_root.rb', line 103

def to_s
  "#{self.class.name}: #{@id}"
end