Module: Sequent::Core::SnapshotStore

Includes:
Helpers::PgsqlHelpers
Included in:
EventStore
Defined in:
lib/sequent/core/snapshot_store.rb

Instance Method Summary collapse

Methods included from Helpers::PgsqlHelpers

#call_procedure, #query_function

Instance Method Details

#aggregates_that_need_snapshots(last_aggregate_id, limit = 10) ⇒ Object

Returns the ids of aggregates that need a new snapshot.



85
86
87
88
89
90
91
92
93
# File 'lib/sequent/core/snapshot_store.rb', line 85

def aggregates_that_need_snapshots(last_aggregate_id, limit = 10)
  query_function(
    connection,
    'aggregates_that_need_snapshots',
    [last_aggregate_id, limit],
    columns: ['aggregate_id'],
  )
    .pluck('aggregate_id')
end

#clear_aggregate_for_snapshotting(aggregate_id) ⇒ Object

Stops snapshotting the specified aggregate. Any existing snapshots for this aggregate are also deleted.



61
62
63
64
65
66
67
# File 'lib/sequent/core/snapshot_store.rb', line 61

def clear_aggregate_for_snapshotting(aggregate_id)
  connection.exec_update(
    'DELETE FROM aggregates_that_need_snapshots WHERE aggregate_id = $1',
    'clear_aggregate_for_snapshotting',
    [aggregate_id],
  )
end

#clear_aggregates_for_snapshotting_with_last_event_before(timestamp) ⇒ Object

Stops snapshotting all aggregates where the last event occurred before the indicated timestamp. Any existing snapshots for this aggregate are also deleted.



72
73
74
75
76
77
78
79
80
# File 'lib/sequent/core/snapshot_store.rb', line 72

def clear_aggregates_for_snapshotting_with_last_event_before(timestamp)
  connection.exec_update(<<~EOS, 'clear_aggregates_for_snapshotting_with_last_event_before', [timestamp])
    DELETE FROM aggregates_that_need_snapshots s
     WHERE NOT EXISTS (SELECT *
                         FROM aggregates a
                         JOIN events e ON (a.aggregate_id, a.events_partition_key) = (e.aggregate_id, e.partition_key)
                        WHERE a.aggregate_id = s.aggregate_id AND e.created_at >= $1)
  EOS
end

#delete_all_snapshotsObject

Deletes all snapshots for all aggregates



33
34
35
# File 'lib/sequent/core/snapshot_store.rb', line 33

def delete_all_snapshots
  call_procedure(connection, 'delete_all_snapshots', [Time.now])
end

#delete_snapshots_before(aggregate_id, sequence_number) ⇒ Object

Deletes all snapshots for aggregate_id with a sequence_number lower than the specified sequence number.



38
39
40
# File 'lib/sequent/core/snapshot_store.rb', line 38

def delete_snapshots_before(aggregate_id, sequence_number)
  call_procedure(connection, 'delete_snapshots_before', [aggregate_id, sequence_number, Time.now])
end

#load_latest_snapshot(aggregate_id) ⇒ Object



27
28
29
30
# File 'lib/sequent/core/snapshot_store.rb', line 27

def load_latest_snapshot(aggregate_id)
  snapshot_hash = query_function(connection, 'load_latest_snapshot', [aggregate_id]).first
  deserialize_event(snapshot_hash) unless snapshot_hash['aggregate_id'].nil?
end

#mark_aggregate_for_snapshotting(aggregate_id, snapshot_outdated_at: Time.now) ⇒ Object

Marks an aggregate for snapshotting. Marked aggregates will be picked up by the background snapshotting task. Another way to mark aggregates for snapshotting is to pass the EventStream#snapshot_outdated_at property to the #store_events method as is done automatically by the AggregateRepository based on the aggregate’s snapshot_threshold.



49
50
51
52
53
54
55
56
57
# File 'lib/sequent/core/snapshot_store.rb', line 49

def mark_aggregate_for_snapshotting(aggregate_id, snapshot_outdated_at: Time.now)
  connection.exec_update(<<~EOS, 'mark_aggregate_for_snapshotting', [aggregate_id, snapshot_outdated_at])
    INSERT INTO aggregates_that_need_snapshots AS row (aggregate_id, snapshot_outdated_at)
    VALUES ($1, $2)
        ON CONFLICT (aggregate_id) DO UPDATE
       SET snapshot_outdated_at = LEAST(row.snapshot_outdated_at, EXCLUDED.snapshot_outdated_at),
           snapshot_scheduled_at = NULL
  EOS
end

#select_aggregates_for_snapshotting(limit:, reschedule_snapshots_scheduled_before: nil) ⇒ Object



95
96
97
98
99
100
101
102
# File 'lib/sequent/core/snapshot_store.rb', line 95

def select_aggregates_for_snapshotting(limit:, reschedule_snapshots_scheduled_before: nil)
  query_function(
    connection,
    'select_aggregates_for_snapshotting',
    [limit, reschedule_snapshots_scheduled_before, Time.now],
    columns: ['aggregate_id'],
  ).pluck('aggregate_id')
end

#store_snapshots(snapshots) ⇒ Object



11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
# File 'lib/sequent/core/snapshot_store.rb', line 11

def store_snapshots(snapshots)
  json = Sequent::Core::Oj.dump(
    snapshots.map do |snapshot|
      {
        aggregate_id: snapshot.aggregate_id,
        sequence_number: snapshot.sequence_number,
        created_at: snapshot.created_at,
        snapshot_type: snapshot.class.name,
        snapshot_json: snapshot,
      }
    end,
  )

  call_procedure(connection, 'store_snapshots', [json])
end