Class: Sequent::Core::EventStore
- Inherits:
-
Object
- Object
- Sequent::Core::EventStore
- Extended by:
- Forwardable
- Includes:
- ActiveRecord::ConnectionAdapters::Quoting, Helpers::PgsqlHelpers, SnapshotStore
- Defined in:
- lib/sequent/core/event_store.rb
Defined Under Namespace
Classes: DeserializeEventError, OptimisticLockingError
Constant Summary collapse
- PRINT_PROGRESS =
->(progress, done, _) do next unless Sequent.logger.debug? if done Sequent.logger.debug("Done replaying #{progress} events") else Sequent.logger.debug("Replayed #{progress} events") end end
Instance Method Summary collapse
-
#commit_events(command, streams_with_events) ⇒ Object
Stores the events in the EventStore and publishes the events to the registered event_handlers.
- #events_exists?(aggregate_id) ⇒ Boolean
- #find_event_stream(aggregate_id) ⇒ Object
- #load_event(aggregate_id, sequence_number) ⇒ Object
-
#load_events(aggregate_id) ⇒ Object
Returns all events for the aggregate ordered by sequence_number, loading them from the latest snapshot event onwards, if a snapshot is present.
- #load_events_for_aggregates(aggregate_ids) ⇒ Object
- #permanently_delete_commands_without_events(aggregate_id: nil, organization_id: nil) ⇒ Object
- #permanently_delete_event_stream(aggregate_id) ⇒ Object
- #permanently_delete_event_streams(aggregate_ids) ⇒ Object
-
#replay_events ⇒ Object
Replays all events in the event store to the registered event_handlers.
-
#replay_events_from_cursor(get_events:, block_size: 2000, on_progress: PRINT_PROGRESS) ⇒ Object
Replays all events on an ‘EventRecord` cursor from the given block.
-
#stream_events_for_aggregate(aggregate_id, load_until: nil, &block) ⇒ Object
Returns all events for the AggregateRoot ordered by sequence_number, disregarding snapshots.
- #stream_exists?(aggregate_id) ⇒ Boolean
Methods included from SnapshotStore
#aggregates_that_need_snapshots, #clear_aggregate_for_snapshotting, #clear_aggregates_for_snapshotting_with_last_event_before, #delete_all_snapshots, #delete_snapshots_before, #load_latest_snapshot, #mark_aggregate_for_snapshotting, #select_aggregates_for_snapshotting, #store_snapshots
Methods included from Helpers::PgsqlHelpers
#call_procedure, #query_function
Instance Method Details
#commit_events(command, streams_with_events) ⇒ Object
Stores the events in the EventStore and publishes the events to the registered event_handlers.
The events are published according to the order in the tail of the given ‘streams_with_events` array pair.
45 46 47 48 49 50 51 52 |
# File 'lib/sequent/core/event_store.rb', line 45 def commit_events(command, streams_with_events) fail ArgumentError, 'command is required' if command.nil? Sequent.logger.debug("[EventStore] Committing events for command #{command.class}") store_events(command, streams_with_events) publish_events(streams_with_events.flat_map { |_, events| events }) end |
#events_exists?(aggregate_id) ⇒ Boolean
123 124 125 |
# File 'lib/sequent/core/event_store.rb', line 123 def events_exists?(aggregate_id) Sequent.configuration.event_record_class.exists?(aggregate_id: aggregate_id) end |
#find_event_stream(aggregate_id) ⇒ Object
173 174 175 176 |
# File 'lib/sequent/core/event_store.rb', line 173 def find_event_stream(aggregate_id) record = Sequent.configuration.stream_record_class.where(aggregate_id: aggregate_id).first record&.event_stream end |
#load_event(aggregate_id, sequence_number) ⇒ Object
88 89 90 91 |
# File 'lib/sequent/core/event_store.rb', line 88 def load_event(aggregate_id, sequence_number) event_hash = query_function(connection, 'load_event', [aggregate_id, sequence_number]).first deserialize_event(event_hash) if event_hash end |
#load_events(aggregate_id) ⇒ Object
Returns all events for the aggregate ordered by sequence_number, loading them from the latest snapshot event onwards, if a snapshot is present
97 98 99 |
# File 'lib/sequent/core/event_store.rb', line 97 def load_events(aggregate_id) load_events_for_aggregates([aggregate_id])[0] end |
#load_events_for_aggregates(aggregate_ids) ⇒ Object
101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 |
# File 'lib/sequent/core/event_store.rb', line 101 def load_events_for_aggregates(aggregate_ids) return [] if aggregate_ids.none? query_events(aggregate_ids) .group_by { |row| row['aggregate_id'] } .values .map do |rows| [ EventStream.new( aggregate_type: rows.first['aggregate_type'], aggregate_id: rows.first['aggregate_id'], events_partition_key: rows.first['events_partition_key'], ), rows.map { |row| deserialize_event(row) }, ] end end |
#permanently_delete_commands_without_events(aggregate_id: nil, organization_id: nil) ⇒ Object
186 187 188 189 190 191 192 |
# File 'lib/sequent/core/event_store.rb', line 186 def permanently_delete_commands_without_events(aggregate_id: nil, organization_id: nil) unless aggregate_id || organization_id fail ArgumentError, 'aggregate_id and/or organization_id must be specified' end call_procedure(connection, 'permanently_delete_commands_without_events', [aggregate_id, organization_id]) end |
#permanently_delete_event_stream(aggregate_id) ⇒ Object
178 179 180 |
# File 'lib/sequent/core/event_store.rb', line 178 def permanently_delete_event_stream(aggregate_id) permanently_delete_event_streams([aggregate_id]) end |
#permanently_delete_event_streams(aggregate_ids) ⇒ Object
182 183 184 |
# File 'lib/sequent/core/event_store.rb', line 182 def permanently_delete_event_streams(aggregate_ids) call_procedure(connection, 'permanently_delete_event_streams', [aggregate_ids.to_json]) end |
#replay_events ⇒ Object
Replays all events in the event store to the registered event_handlers.
DEPRECATED: use replay_events_from_cursor
instead.
132 133 134 135 136 |
# File 'lib/sequent/core/event_store.rb', line 132 def replay_events warn '[DEPRECATION] `replay_events` is deprecated in favor of `replay_events_from_cursor`' events = yield.map { |event_hash| deserialize_event(event_hash) } publish_events(events) end |
#replay_events_from_cursor(get_events:, block_size: 2000, on_progress: PRINT_PROGRESS) ⇒ Object
Replays all events on an ‘EventRecord` cursor from the given block.
Prefer this replay method if your db adapter supports cursors.
145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 |
# File 'lib/sequent/core/event_store.rb', line 145 def replay_events_from_cursor(get_events:, block_size: 2000, on_progress: PRINT_PROGRESS) progress = 0 cursor = get_events.call ids_replayed = [] cursor.each_row(block_size: block_size).each do |record| event = deserialize_event(record) publish_events([event]) progress += 1 ids_replayed << record['aggregate_id'] if progress % block_size == 0 on_progress[progress, false, ids_replayed] ids_replayed.clear end end on_progress[progress, true, ids_replayed] end |
#stream_events_for_aggregate(aggregate_id, load_until: nil, &block) ⇒ Object
Returns all events for the AggregateRoot ordered by sequence_number, disregarding snapshots.
This streaming is done in batches to prevent loading many events in memory all at once. A usecase for ignoring the snapshots is when events of a nested AggregateRoot need to be loaded up until a certain moment in time.
63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 |
# File 'lib/sequent/core/event_store.rb', line 63 def stream_events_for_aggregate(aggregate_id, load_until: nil, &block) stream = find_event_stream(aggregate_id) fail ArgumentError, 'no stream found for this aggregate' if stream.blank? has_events = false # PostgreSQLCursor::Cursor does not support bind parameters, so bind parameters manually instead. sql = ActiveRecord::Base.sanitize_sql_array( [ 'SELECT * FROM load_events(:aggregate_ids, FALSE, :load_until)', { aggregate_ids: [aggregate_id].to_json, load_until: load_until, }, ], ) PostgreSQLCursor::Cursor.new(sql, {connection: connection}).each_row do |event_hash| has_events = true event = deserialize_event(event_hash) block.call([stream, event]) end fail ArgumentError, 'no events for this aggregate' unless has_events end |
#stream_exists?(aggregate_id) ⇒ Boolean
119 120 121 |
# File 'lib/sequent/core/event_store.rb', line 119 def stream_exists?(aggregate_id) Sequent.configuration.stream_record_class.exists?(aggregate_id: aggregate_id) end |