Module: Sequent::Core::SnapshotStore
- Includes:
- Helpers::PgsqlHelpers
- Included in:
- EventStore
- Defined in:
- lib/sequent/core/snapshot_store.rb
Instance Method Summary collapse
-
#aggregates_that_need_snapshots(last_aggregate_id, limit = 10) ⇒ Object
Returns the ids of aggregates that need a new snapshot.
-
#clear_aggregate_for_snapshotting(aggregate_id) ⇒ Object
Stops snapshotting the specified aggregate.
-
#clear_aggregates_for_snapshotting_with_last_event_before(timestamp) ⇒ Object
Stops snapshotting all aggregates where the last event occurred before the indicated timestamp.
-
#delete_all_snapshots ⇒ Object
Deletes all snapshots for all aggregates.
-
#delete_snapshots_before(aggregate_id, sequence_number) ⇒ Object
Deletes all snapshots for aggregate_id with a sequence_number lower than the specified sequence number.
- #load_latest_snapshot(aggregate_id) ⇒ Object
-
#mark_aggregate_for_snapshotting(aggregate_id, snapshot_outdated_at: Time.now) ⇒ Object
Marks an aggregate for snapshotting.
- #select_aggregates_for_snapshotting(limit:, reschedule_snapshots_scheduled_before: nil) ⇒ Object
- #store_snapshots(snapshots) ⇒ Object
Methods included from Helpers::PgsqlHelpers
#call_procedure, #query_function
Instance Method Details
#aggregates_that_need_snapshots(last_aggregate_id, limit = 10) ⇒ Object
Returns the ids of aggregates that need a new snapshot.
85 86 87 88 89 90 91 92 93 |
# File 'lib/sequent/core/snapshot_store.rb', line 85 def aggregates_that_need_snapshots(last_aggregate_id, limit = 10) query_function( connection, 'aggregates_that_need_snapshots', [last_aggregate_id, limit], columns: ['aggregate_id'], ) .pluck('aggregate_id') end |
#clear_aggregate_for_snapshotting(aggregate_id) ⇒ Object
Stops snapshotting the specified aggregate. Any existing snapshots for this aggregate are also deleted.
61 62 63 64 65 66 67 |
# File 'lib/sequent/core/snapshot_store.rb', line 61 def clear_aggregate_for_snapshotting(aggregate_id) connection.exec_update( 'DELETE FROM aggregates_that_need_snapshots WHERE aggregate_id = $1', 'clear_aggregate_for_snapshotting', [aggregate_id], ) end |
#clear_aggregates_for_snapshotting_with_last_event_before(timestamp) ⇒ Object
Stops snapshotting all aggregates where the last event occurred before the indicated timestamp. Any existing snapshots for this aggregate are also deleted.
72 73 74 75 76 77 78 79 80 |
# File 'lib/sequent/core/snapshot_store.rb', line 72 def clear_aggregates_for_snapshotting_with_last_event_before() connection.exec_update(<<~EOS, 'clear_aggregates_for_snapshotting_with_last_event_before', []) DELETE FROM aggregates_that_need_snapshots s WHERE NOT EXISTS (SELECT * FROM aggregates a JOIN events e ON (a.aggregate_id, a.events_partition_key) = (e.aggregate_id, e.partition_key) WHERE a.aggregate_id = s.aggregate_id AND e.created_at >= $1) EOS end |
#delete_all_snapshots ⇒ Object
Deletes all snapshots for all aggregates
33 34 35 |
# File 'lib/sequent/core/snapshot_store.rb', line 33 def delete_all_snapshots call_procedure(connection, 'delete_all_snapshots', [Time.now]) end |
#delete_snapshots_before(aggregate_id, sequence_number) ⇒ Object
Deletes all snapshots for aggregate_id with a sequence_number lower than the specified sequence number.
38 39 40 |
# File 'lib/sequent/core/snapshot_store.rb', line 38 def delete_snapshots_before(aggregate_id, sequence_number) call_procedure(connection, 'delete_snapshots_before', [aggregate_id, sequence_number, Time.now]) end |
#load_latest_snapshot(aggregate_id) ⇒ Object
27 28 29 30 |
# File 'lib/sequent/core/snapshot_store.rb', line 27 def load_latest_snapshot(aggregate_id) snapshot_hash = query_function(connection, 'load_latest_snapshot', [aggregate_id]).first deserialize_event(snapshot_hash) unless snapshot_hash['aggregate_id'].nil? end |
#mark_aggregate_for_snapshotting(aggregate_id, snapshot_outdated_at: Time.now) ⇒ Object
Marks an aggregate for snapshotting. Marked aggregates will be picked up by the background snapshotting task. Another way to mark aggregates for snapshotting is to pass the EventStream#snapshot_outdated_at property to the #store_events
method as is done automatically by the AggregateRepository
based on the aggregate’s snapshot_threshold
.
49 50 51 52 53 54 55 56 57 |
# File 'lib/sequent/core/snapshot_store.rb', line 49 def mark_aggregate_for_snapshotting(aggregate_id, snapshot_outdated_at: Time.now) connection.exec_update(<<~EOS, 'mark_aggregate_for_snapshotting', [aggregate_id, snapshot_outdated_at]) INSERT INTO aggregates_that_need_snapshots AS row (aggregate_id, snapshot_outdated_at) VALUES ($1, $2) ON CONFLICT (aggregate_id) DO UPDATE SET snapshot_outdated_at = LEAST(row.snapshot_outdated_at, EXCLUDED.snapshot_outdated_at), snapshot_scheduled_at = NULL EOS end |
#select_aggregates_for_snapshotting(limit:, reschedule_snapshots_scheduled_before: nil) ⇒ Object
95 96 97 98 99 100 101 102 |
# File 'lib/sequent/core/snapshot_store.rb', line 95 def select_aggregates_for_snapshotting(limit:, reschedule_snapshots_scheduled_before: nil) query_function( connection, 'select_aggregates_for_snapshotting', [limit, reschedule_snapshots_scheduled_before, Time.now], columns: ['aggregate_id'], ).pluck('aggregate_id') end |
#store_snapshots(snapshots) ⇒ Object
11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 |
# File 'lib/sequent/core/snapshot_store.rb', line 11 def store_snapshots(snapshots) json = Sequent::Core::Oj.dump( snapshots.map do |snapshot| { aggregate_id: snapshot.aggregate_id, sequence_number: snapshot.sequence_number, created_at: snapshot.created_at, snapshot_type: snapshot.class.name, snapshot_json: snapshot, } end, ) call_procedure(connection, 'store_snapshots', [json]) end |