Class: Sequent::Core::AggregateRoot
Overview
Base class for all your domain classes.
load_from_history
functionality to be loaded_from_history, meaning a stream of events.
Instance Attribute Summary collapse
Class Method Summary
collapse
Instance Method Summary
collapse
included
included
#dispatch_message, #handle_message, included
Constructor Details
Returns a new instance of AggregateRoot.
61
62
63
64
65
|
# File 'lib/sequent/core/aggregate_root.rb', line 61
def initialize(id)
@id = id
@uncommitted_events = []
@sequence_number = 1
end
|
Instance Attribute Details
#id ⇒ Object
Returns the value of attribute id.
43
44
45
|
# File 'lib/sequent/core/aggregate_root.rb', line 43
def id
@id
end
|
#latest_snapshot_sequence_number ⇒ Object
Returns the value of attribute latest_snapshot_sequence_number.
44
45
46
|
# File 'lib/sequent/core/aggregate_root.rb', line 44
def latest_snapshot_sequence_number
@latest_snapshot_sequence_number
end
|
#sequence_number ⇒ Object
Returns the value of attribute sequence_number.
43
44
45
|
# File 'lib/sequent/core/aggregate_root.rb', line 43
def sequence_number
@sequence_number
end
|
#uncommitted_events ⇒ Object
Returns the value of attribute uncommitted_events.
43
44
45
|
# File 'lib/sequent/core/aggregate_root.rb', line 43
def uncommitted_events
@uncommitted_events
end
|
Class Method Details
.load_from_history(stream, events) ⇒ Object
46
47
48
49
50
51
52
53
54
55
56
57
58
59
|
# File 'lib/sequent/core/aggregate_root.rb', line 46
def self.load_from_history(stream, events)
first, *rest = events
if first.is_a? SnapshotEvent
aggregate_root = Marshal.load(Base64.decode64(first.data))
aggregate_root.latest_snapshot_sequence_number = first.sequence_number
rest.each { |x| aggregate_root.apply_event(x) }
else
aggregate_root = allocate aggregate_root.load_from_history(stream, events)
end
aggregate_root
end
|
.stream_from_history(stream) ⇒ Object
91
92
93
94
95
|
# File 'lib/sequent/core/aggregate_root.rb', line 91
def self.stream_from_history(stream)
aggregate_root = allocate
aggregate_root.initialize_for_streaming(stream)
aggregate_root
end
|
Instance Method Details
#apply_event(event) ⇒ Object
135
136
137
138
|
# File 'lib/sequent/core/aggregate_root.rb', line 135
def apply_event(event)
handle_message(event)
@sequence_number = event.sequence_number + 1
end
|
#clear_events ⇒ Object
121
122
123
|
# File 'lib/sequent/core/aggregate_root.rb', line 121
def clear_events
@uncommitted_events = []
end
|
#event_stream ⇒ Object
101
102
103
104
105
106
107
108
|
# File 'lib/sequent/core/aggregate_root.rb', line 101
def event_stream
EventStream.new(
aggregate_type: self.class.name,
aggregate_id: id,
events_partition_key: events_partition_key,
snapshot_outdated_at: snapshot_outdated? ? Time.now : nil,
)
end
|
#events_partition_key ⇒ Object
Provide the partitioning key for storing events. This value must be a string and will be used by PostgreSQL to store the events in the right partition.
The value may change over the lifetime of the aggregate, old events will be moved to the correct partition after a change. This can be an expensive database operation.
117
118
119
|
# File 'lib/sequent/core/aggregate_root.rb', line 117
def events_partition_key
nil
end
|
#initialize_for_streaming(stream) ⇒ Object
77
78
79
80
81
|
# File 'lib/sequent/core/aggregate_root.rb', line 77
def initialize_for_streaming(stream)
@uncommitted_events = []
@sequence_number = 1
@event_stream = stream
end
|
#load_from_history(stream, events) ⇒ Object
67
68
69
70
71
72
73
74
75
|
# File 'lib/sequent/core/aggregate_root.rb', line 67
def load_from_history(stream, events)
fail 'Empty history' if events.empty?
@id = events.first.aggregate_id
@uncommitted_events = []
@sequence_number = 1
@event_stream = stream
events.each { |event| apply_event(event) }
end
|
#snapshot_outdated? ⇒ Boolean
125
126
127
128
129
|
# File 'lib/sequent/core/aggregate_root.rb', line 125
def snapshot_outdated?
snapshot_threshold = self.class.snapshot_default_threshold
events_since_latest_snapshot = @sequence_number - (latest_snapshot_sequence_number || 1)
snapshot_threshold.present? && events_since_latest_snapshot >= snapshot_threshold
end
|
#stream_from_history(stream_events) ⇒ Object
83
84
85
86
87
88
89
|
# File 'lib/sequent/core/aggregate_root.rb', line 83
def stream_from_history(stream_events)
_stream, event = stream_events
fail 'Empty history' if event.blank?
@id ||= event.aggregate_id
apply_event(event)
end
|
#take_snapshot ⇒ Object
131
132
133
|
# File 'lib/sequent/core/aggregate_root.rb', line 131
def take_snapshot
build_event SnapshotEvent, data: Base64.encode64(Marshal.dump(self))
end
|
#to_s ⇒ Object
97
98
99
|
# File 'lib/sequent/core/aggregate_root.rb', line 97
def to_s
"#{self.class.name}: #{@id}"
end
|