Module: EventSourcery::Postgres::Schema

Defined in:
lib/event_sourcery/postgres/schema.rb

Class Method Summary collapse

Class Method Details

.create_aggregates(db: EventSourcery::Postgres.config.event_store_database, table_name: EventSourcery::Postgres.config.aggregates_table_name) ⇒ Object

Create the aggregates table. Needs the database and the table name. The defaults will be whats specified in config.

Parameters:

  • db (defaults to: EventSourcery::Postgres.config.event_store_database)

    the Postgres database to use

  • table_name (defaults to: EventSourcery::Postgres.config.aggregates_table_name)

    the name of the aggregates table



53
54
55
56
57
58
59
# File 'lib/event_sourcery/postgres/schema.rb', line 53

def create_aggregates(db: EventSourcery::Postgres.config.event_store_database,
                      table_name: EventSourcery::Postgres.config.aggregates_table_name)
  db.create_table(table_name) do
    uuid :aggregate_id, primary_key: true
    column :version, :bigint, default: 1
  end
end

.create_event_store(db: EventSourcery::Postgres.config.event_store_database, events_table_name: EventSourcery::Postgres.config.events_table_name, aggregates_table_name: EventSourcery::Postgres.config.aggregates_table_name, write_events_function_name: EventSourcery::Postgres.config.write_events_function_name) ⇒ Object

This will create the event store tables and functions (event, aggregates, tracker and create or update functions) for the given Postgres database. The default will be the one specified in the config.

Parameters:

  • db (defaults to: EventSourcery::Postgres.config.event_store_database)

    the Postgres database to use



12
13
14
15
16
17
18
19
# File 'lib/event_sourcery/postgres/schema.rb', line 12

def create_event_store(db: EventSourcery::Postgres.config.event_store_database,
                       events_table_name: EventSourcery::Postgres.config.events_table_name,
                       aggregates_table_name: EventSourcery::Postgres.config.aggregates_table_name,
                       write_events_function_name: EventSourcery::Postgres.config.write_events_function_name)
  create_events(db: db, table_name: events_table_name)
  create_aggregates(db: db, table_name: aggregates_table_name)
  create_or_update_functions(db: db, events_table_name: events_table_name, function_name: write_events_function_name, aggregates_table_name: aggregates_table_name)
end

.create_events(db: EventSourcery::Postgres.config.event_store_database, table_name: EventSourcery::Postgres.config.events_table_name) ⇒ Object

Create the events table. Needs the database and the table name. The defaults will be whats specified in config.

Parameters:

  • db (defaults to: EventSourcery::Postgres.config.event_store_database)

    the Postgres database to use

  • table_name (defaults to: EventSourcery::Postgres.config.events_table_name)

    the name of the events table



26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
# File 'lib/event_sourcery/postgres/schema.rb', line 26

def create_events(db: EventSourcery::Postgres.config.event_store_database,
                  table_name: EventSourcery::Postgres.config.events_table_name)
  db.run 'CREATE EXTENSION IF NOT EXISTS "uuid-ossp"'
  db.create_table(table_name) do
    primary_key :id, type: :Bignum
    column :uuid,           :uuid,    null: false, default: Sequel.lit('uuid_generate_v4()')
    column :aggregate_id,   :uuid,    null: false
    column :type,           :varchar, null: false, size: 255
    column :body,           :json,    null: false
    column :version,        :bigint,  null: false
    column :correlation_id, :uuid
    column :causation_id,   :uuid
    column :created_at,     :'timestamp without time zone', null: false, default: Sequel.lit("(now() at time zone 'utc')")
    index [:aggregate_id, :version], unique: true
    index :uuid, unique: true
    index :type
    index :correlation_id
    index :causation_id
    index :created_at
  end
end

.create_or_update_functions(db: EventSourcery::Postgres.config.event_store_database, function_name: EventSourcery::Postgres.config.write_events_function_name, events_table_name: EventSourcery::Postgres.config.events_table_name, aggregates_table_name: EventSourcery::Postgres.config.aggregates_table_name) ⇒ Object

Create the ‘create or update’ functions. Needs the database, table name, function name and aggregates table name. The defaults will be whats specified in config.

Parameters:

  • db (defaults to: EventSourcery::Postgres.config.event_store_database)

    the Postgres database to use

  • function_name (defaults to: EventSourcery::Postgres.config.write_events_function_name)

    the name of the write events function

  • events_table_name (defaults to: EventSourcery::Postgres.config.events_table_name)

    the name of the events table

  • aggregates_table_name (defaults to: EventSourcery::Postgres.config.aggregates_table_name)

    the name of the aggregates table



69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
# File 'lib/event_sourcery/postgres/schema.rb', line 69

def create_or_update_functions(db: EventSourcery::Postgres.config.event_store_database,
                               function_name: EventSourcery::Postgres.config.write_events_function_name,
                               events_table_name: EventSourcery::Postgres.config.events_table_name,
                               aggregates_table_name: EventSourcery::Postgres.config.aggregates_table_name)
  db.run <<-SQL
create or replace function #{function_name}(_aggregateId uuid,
                                      _eventTypes varchar[],
                                      _expectedVersion int,
                                      _bodies json[],
                                      _createdAtTimes timestamp without time zone[],
                                      _eventUUIDs uuid[],
                                      _correlationIds uuid[],
                                      _causationIds uuid[],
                                      _lockTable boolean) returns void as $$
declare
currentVersion int;
body json;
eventVersion int;
eventId text;
index int;
newVersion int;
numEvents int;
createdAt timestamp without time zone;
begin
numEvents := array_length(_bodies, 1);
select version into currentVersion from #{aggregates_table_name} where aggregate_id = _aggregateId;
if not found then
  -- when we have no existing version for this aggregate
  if _expectedVersion = 0 or _expectedVersion is null then
    -- set the version to 1 if expected version is null or 0
    insert into #{aggregates_table_name}(aggregate_id, version) values(_aggregateId, numEvents);
    currentVersion := 0;
  else
    raise 'Concurrency conflict. Current version: 0, expected version: %', _expectedVersion;
  end if;
else
  if _expectedVersion is null then
    -- automatically increment the version
    update #{aggregates_table_name} set version = version + numEvents where aggregate_id = _aggregateId returning version into newVersion;
    currentVersion := newVersion - numEvents;
  else
    -- increment the version if it's at our expected version
    update #{aggregates_table_name} set version = version + numEvents where aggregate_id = _aggregateId and version = _expectedVersion;
    if not found then
-- version was not at expected_version, raise an error.
-- currentVersion may not equal what it did in the database when the
-- above update statement is executed (it may have been incremented by another
-- process)
raise 'Concurrency conflict. Last known current version: %, expected version: %', currentVersion, _expectedVersion;
    end if;
  end if;
end if;
index := 1;
eventVersion := currentVersion + 1;
if _lockTable then
    -- Ensure this transaction is the only one writing events to guarantee
    -- linear growth of sequence IDs.
    -- Any value that won't conflict with other advisory locks will work.
    -- The Postgres tracker currently obtains an advisory lock using it's
    -- integer row ID, so values 1 to the number of ESP's in the system would
    -- be taken if the tracker is running in the same database as your
    -- projections.
    perform pg_advisory_xact_lock(-1);
end if;
foreach body IN ARRAY(_bodies)
loop
  if _createdAtTimes[index] is not null then
    createdAt := _createdAtTimes[index];
  else
    createdAt := now() at time zone 'utc';
  end if;

  insert into #{events_table_name}
    (uuid, aggregate_id, type, body, version, correlation_id, causation_id, created_at)
  values
    (
_eventUUIDs[index],
_aggregateId,
_eventTypes[index],
body,
eventVersion,
_correlationIds[index],
_causationIds[index],
createdAt
    )
  returning id into eventId;

  eventVersion := eventVersion + 1;
  index := index + 1;
end loop;
perform pg_notify('new_event', eventId);
end;
$$ language plpgsql;
SQL
end

.create_projector_tracker(db: EventSourcery::Postgres.config.projections_database, table_name: EventSourcery::Postgres.config.tracker_table_name) ⇒ Object

Create the projector tracker table. Needs the database and the table name. The defaults will be whats specified in config.

Parameters:

  • db (defaults to: EventSourcery::Postgres.config.projections_database)

    the Postgres database to use

  • table_name (defaults to: EventSourcery::Postgres.config.tracker_table_name)

    the name of the aggregates table



170
171
172
173
174
175
176
177
178
# File 'lib/event_sourcery/postgres/schema.rb', line 170

def create_projector_tracker(db: EventSourcery::Postgres.config.projections_database,
                             table_name: EventSourcery::Postgres.config.tracker_table_name)
  db.create_table(table_name) do
    primary_key :id, type: :Bignum
    column :name, 'varchar(255) not null'
    column :last_processed_event_id, 'bigint not null default 0'
    index :name, unique: true
  end
end