Class: Sequent::Core::AggregateRoot

Inherits:
Object
  • Object
show all
Extended by:
ActiveSupport::DescendantsTracker
Includes:
Helpers::AutosetAttributes, Helpers::MessageHandler, SnapshotConfiguration
Defined in:
lib/sequent/core/aggregate_root.rb

Overview

Base class for all your domain classes.

load_from_history functionality to be loaded_from_history, meaning a stream of events.

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Methods included from SnapshotConfiguration

included

Methods included from Helpers::AutosetAttributes

included

Methods included from Helpers::MessageHandler

#dispatch_message, #handle_message, included

Constructor Details

#initialize(id) ⇒ AggregateRoot

Returns a new instance of AggregateRoot.



61
62
63
64
65
# File 'lib/sequent/core/aggregate_root.rb', line 61

def initialize(id)
  @id = id
  @uncommitted_events = []
  @sequence_number = 1
end

Instance Attribute Details

#idObject (readonly)

Returns the value of attribute id.



43
44
45
# File 'lib/sequent/core/aggregate_root.rb', line 43

def id
  @id
end

#latest_snapshot_sequence_numberObject

Returns the value of attribute latest_snapshot_sequence_number.



44
45
46
# File 'lib/sequent/core/aggregate_root.rb', line 44

def latest_snapshot_sequence_number
  @latest_snapshot_sequence_number
end

#sequence_numberObject (readonly)

Returns the value of attribute sequence_number.



43
44
45
# File 'lib/sequent/core/aggregate_root.rb', line 43

def sequence_number
  @sequence_number
end

#uncommitted_eventsObject (readonly)

Returns the value of attribute uncommitted_events.



43
44
45
# File 'lib/sequent/core/aggregate_root.rb', line 43

def uncommitted_events
  @uncommitted_events
end

Class Method Details

.load_from_history(stream, events) ⇒ Object



46
47
48
49
50
51
52
53
54
55
56
57
58
59
# File 'lib/sequent/core/aggregate_root.rb', line 46

def self.load_from_history(stream, events)
  first, *rest = events
  if first.is_a? SnapshotEvent
    # rubocop:disable Security/MarshalLoad
    aggregate_root = Marshal.load(Base64.decode64(first.data))
    # rubocop:enable Security/MarshalLoad
    aggregate_root.latest_snapshot_sequence_number = first.sequence_number
    rest.each { |x| aggregate_root.apply_event(x) }
  else
    aggregate_root = allocate # allocate without calling new
    aggregate_root.load_from_history(stream, events)
  end
  aggregate_root
end

.stream_from_history(stream) ⇒ Object



91
92
93
94
95
# File 'lib/sequent/core/aggregate_root.rb', line 91

def self.stream_from_history(stream)
  aggregate_root = allocate
  aggregate_root.initialize_for_streaming(stream)
  aggregate_root
end

Instance Method Details

#apply_event(event) ⇒ Object



135
136
137
138
# File 'lib/sequent/core/aggregate_root.rb', line 135

def apply_event(event)
  handle_message(event)
  @sequence_number = event.sequence_number + 1
end

#clear_eventsObject



121
122
123
# File 'lib/sequent/core/aggregate_root.rb', line 121

def clear_events
  @uncommitted_events = []
end

#event_streamObject



101
102
103
104
105
106
107
108
# File 'lib/sequent/core/aggregate_root.rb', line 101

def event_stream
  EventStream.new(
    aggregate_type: self.class.name,
    aggregate_id: id,
    events_partition_key: events_partition_key,
    snapshot_outdated_at: snapshot_outdated? ? Time.now : nil,
  )
end

#events_partition_keyObject

Provide the partitioning key for storing events. This value must be a string and will be used by PostgreSQL to store the events in the right partition.

The value may change over the lifetime of the aggregate, old events will be moved to the correct partition after a change. This can be an expensive database operation.



117
118
119
# File 'lib/sequent/core/aggregate_root.rb', line 117

def events_partition_key
  nil
end

#initialize_for_streaming(stream) ⇒ Object



77
78
79
80
81
# File 'lib/sequent/core/aggregate_root.rb', line 77

def initialize_for_streaming(stream)
  @uncommitted_events = []
  @sequence_number = 1
  @event_stream = stream
end

#load_from_history(stream, events) ⇒ Object



67
68
69
70
71
72
73
74
75
# File 'lib/sequent/core/aggregate_root.rb', line 67

def load_from_history(stream, events)
  fail 'Empty history' if events.empty?

  @id = events.first.aggregate_id
  @uncommitted_events = []
  @sequence_number = 1
  @event_stream = stream
  events.each { |event| apply_event(event) }
end

#snapshot_outdated?Boolean

Returns:



125
126
127
128
129
# File 'lib/sequent/core/aggregate_root.rb', line 125

def snapshot_outdated?
  snapshot_threshold = self.class.snapshot_default_threshold
  events_since_latest_snapshot = @sequence_number - (latest_snapshot_sequence_number || 1)
  snapshot_threshold.present? && events_since_latest_snapshot >= snapshot_threshold
end

#stream_from_history(stream_events) ⇒ Object



83
84
85
86
87
88
89
# File 'lib/sequent/core/aggregate_root.rb', line 83

def stream_from_history(stream_events)
  _stream, event = stream_events
  fail 'Empty history' if event.blank?

  @id ||= event.aggregate_id
  apply_event(event)
end

#take_snapshotObject



131
132
133
# File 'lib/sequent/core/aggregate_root.rb', line 131

def take_snapshot
  build_event SnapshotEvent, data: Base64.encode64(Marshal.dump(self))
end

#to_sObject



97
98
99
# File 'lib/sequent/core/aggregate_root.rb', line 97

def to_s
  "#{self.class.name}: #{@id}"
end