Class: Sequent::Core::AggregateRoot

Inherits:
Object
  • Object
show all
Extended by:
ActiveSupport::DescendantsTracker
Includes:
Helpers::AutosetAttributes, Helpers::MessageHandler, SnapshotConfiguration
Defined in:
lib/sequent/core/aggregate_root.rb

Overview

Base class for all your domain classes.

load_from_history functionality to be loaded_from_history, meaning a stream of events.

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Methods included from SnapshotConfiguration

included

Methods included from Helpers::AutosetAttributes

included

Methods included from Helpers::MessageHandler

#handle_message, included

Constructor Details

#initialize(id) ⇒ AggregateRoot

Returns a new instance of AggregateRoot.



60
61
62
63
64
65
66
67
# File 'lib/sequent/core/aggregate_root.rb', line 60

def initialize(id)
  @id = id
  @uncommitted_events = []
  @sequence_number = 1
  @event_stream = EventStream.new aggregate_type: self.class.name,
                                  aggregate_id: id,
                                  snapshot_threshold: self.class.snapshot_default_threshold
end

Instance Attribute Details

#event_streamObject (readonly)

Returns the value of attribute event_stream.



44
45
46
# File 'lib/sequent/core/aggregate_root.rb', line 44

def event_stream
  @event_stream
end

#idObject (readonly)

Returns the value of attribute id.



44
45
46
# File 'lib/sequent/core/aggregate_root.rb', line 44

def id
  @id
end

#sequence_numberObject (readonly)

Returns the value of attribute sequence_number.



44
45
46
# File 'lib/sequent/core/aggregate_root.rb', line 44

def sequence_number
  @sequence_number
end

#uncommitted_eventsObject (readonly)

Returns the value of attribute uncommitted_events.



44
45
46
# File 'lib/sequent/core/aggregate_root.rb', line 44

def uncommitted_events
  @uncommitted_events
end

Class Method Details

.load_from_history(stream, events) ⇒ Object



46
47
48
49
50
51
52
53
54
55
56
57
58
# File 'lib/sequent/core/aggregate_root.rb', line 46

def self.load_from_history(stream, events)
  first, *rest = events
  if first.is_a? SnapshotEvent
    # rubocop:disable Security/MarshalLoad
    aggregate_root = Marshal.load(Base64.decode64(first.data))
    # rubocop:enable Security/MarshalLoad
    rest.each { |x| aggregate_root.apply_event(x) }
  else
    aggregate_root = allocate # allocate without calling new
    aggregate_root.load_from_history(stream, events)
  end
  aggregate_root
end

.stream_from_history(stream) ⇒ Object



93
94
95
96
97
# File 'lib/sequent/core/aggregate_root.rb', line 93

def self.stream_from_history(stream)
  aggregate_root = allocate
  aggregate_root.initialize_for_streaming(stream)
  aggregate_root
end

Instance Method Details

#apply_event(event) ⇒ Object



112
113
114
115
# File 'lib/sequent/core/aggregate_root.rb', line 112

def apply_event(event)
  handle_message(event)
  @sequence_number = event.sequence_number + 1
end

#clear_eventsObject



103
104
105
# File 'lib/sequent/core/aggregate_root.rb', line 103

def clear_events
  @uncommitted_events = []
end

#initialize_for_streaming(stream) ⇒ Object



79
80
81
82
83
# File 'lib/sequent/core/aggregate_root.rb', line 79

def initialize_for_streaming(stream)
  @uncommitted_events = []
  @sequence_number = 1
  @event_stream = stream
end

#load_from_history(stream, events) ⇒ Object



69
70
71
72
73
74
75
76
77
# File 'lib/sequent/core/aggregate_root.rb', line 69

def load_from_history(stream, events)
  fail 'Empty history' if events.empty?

  @id = events.first.aggregate_id
  @uncommitted_events = []
  @sequence_number = 1
  @event_stream = stream
  events.each { |event| apply_event(event) }
end

#stream_from_history(stream_events) ⇒ Object



85
86
87
88
89
90
91
# File 'lib/sequent/core/aggregate_root.rb', line 85

def stream_from_history(stream_events)
  _stream, event = stream_events
  fail 'Empty history' if event.blank?

  @id ||= event.aggregate_id
  apply_event(event)
end

#take_snapshot!Object



107
108
109
110
# File 'lib/sequent/core/aggregate_root.rb', line 107

def take_snapshot!
  snapshot = build_event SnapshotEvent, data: Base64.encode64(Marshal.dump(self))
  @uncommitted_events << snapshot
end

#to_sObject



99
100
101
# File 'lib/sequent/core/aggregate_root.rb', line 99

def to_s
  "#{self.class.name}: #{@id}"
end