Class: Sequent::Migrations::ViewSchema

Inherits:
Object
  • Object
show all
Includes:
Sql, Util::Printer, Util::Timer
Defined in:
lib/sequent/migrations/view_schema.rb

Overview

ViewSchema is used for migration of you view_schema. For instance when you create new Projectors or change existing Projectors.

The following migrations are supported:

  • ReplayTable (Projector migrations)

  • AlterTable (For instance if you introduce a new column)

To maintain your migrations you need to:

  1. Create a class that extends ‘Sequent::Migrations::Projectors` and specify in `Sequent.configuration.migrations_class_name`

  2. Define per version which migrations you want to execute See the definition of ‘Sequent::Migrations::Projectors.versions` and `Sequent::Migrations::Projectors.version`

  3. Specify in Sequent where your sql files reside (Sequent.configuration.migration_sql_files_directory)

  4. Ensure that you add %SUFFIX% to each name that needs to be unique in postgres (like TABLE names, INDEX names, PRIMARY KEYS) E.g. ‘create table foo%SUFFIX% (id serial NOT NULL, CONSTRAINT foo_pkey%SUFFIX% PRIMARY KEY (id))`

  5. If you want to run an ‘alter_table` migration ensure that

a sql file named `table_name_VERSION.sql` exists.

Example:

class AppMigrations < Sequent::Migrations::Projectors

def self.version
  '3'
end

def self.versions
  {
    '1' => [Sequent.all_projectors],
    '2' => [
      UserProjector,
      InvoiceProjector,
    ],
    '3' => [
      Sequent::Migrations.alter_table(UserRecord)
    ]

  }
end

end

Defined Under Namespace

Classes: ReplayedIds, Versions

Constant Summary collapse

LENGTH_OF_SUBSTRING_INDEX_ON_AGGREGATE_ID_IN_EVENT_STORE =

Corresponds with the index on aggregate_id column in the event_records table

Since we replay in batches of the first 3 chars of the uuid we created an index on these 3 characters. Hence the name ;-)

This also means that the online replay is divided up into 16**3 groups This might seem a lot for starting event store, but when you will get more events, you will see that this is pretty good partitioned.

3

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Methods included from Sql

#exec_sql, #sql_file_to_statements

Methods included from Util::Printer

#recursively_print

Methods included from Util::Timer

#time

Constructor Details

#initialize(db_config:) ⇒ ViewSchema

Returns a new instance of ViewSchema.



105
106
107
108
109
# File 'lib/sequent/migrations/view_schema.rb', line 105

def initialize(db_config:)
  @db_config = db_config
  @view_schema = Sequent.configuration.view_schema_name
  @logger = Sequent.logger
end

Instance Attribute Details

#db_configObject (readonly)

Returns the value of attribute db_config.



80
81
82
# File 'lib/sequent/migrations/view_schema.rb', line 80

def db_config
  @db_config
end

#loggerObject (readonly)

Returns the value of attribute logger.



80
81
82
# File 'lib/sequent/migrations/view_schema.rb', line 80

def logger
  @logger
end

#view_schemaObject (readonly)

Returns the value of attribute view_schema.



80
81
82
# File 'lib/sequent/migrations/view_schema.rb', line 80

def view_schema
  @view_schema
end

Class Method Details

.create_view_schema_if_not_exists(env:) ⇒ Object

Parameters:

  • env (String)

    The environment used for connecting the database

See Also:



95
96
97
98
99
100
101
102
# File 'lib/sequent/migrations/view_schema.rb', line 95

def create_view_schema_if_not_exists(env:)
  fail ArgumentError, 'env is required' if env.blank?

  db_config = Sequent::Support::Database.read_config(env)
  Sequent::Support::Database.establish_connection(db_config)

  new(db_config: db_config).create_view_schema_if_not_exists
end

.create_view_tables(env:) ⇒ Object

Parameters:

  • env (String)

    The environment used for connecting the database

See Also:



85
86
87
88
89
90
91
# File 'lib/sequent/migrations/view_schema.rb', line 85

def create_view_tables(env:)
  fail ArgumentError, 'env is required' if env.blank?

  db_config = Sequent::Support::Database.read_config(env)
  Sequent::Support::Database.establish_connection(db_config)
  new(db_config: db_config).create_view_tables
end

Instance Method Details

#create_view_schema_if_not_existsObject

Utility method that creates the view_schema and the meta data tables

This method is mainly useful during an initial setup of the view schema



161
162
163
164
165
166
167
168
169
170
171
# File 'lib/sequent/migrations/view_schema.rb', line 161

def create_view_schema_if_not_exists
  exec_sql(%(CREATE SCHEMA IF NOT EXISTS #{view_schema}))
  in_view_schema do
    exec_sql(<<~SQL.chomp)
      CREATE TABLE IF NOT EXISTS #{Versions.table_name} (version integer NOT NULL, CONSTRAINT version_pk PRIMARY KEY(version))
    SQL
    exec_sql(<<~SQL.chomp)
      CREATE TABLE IF NOT EXISTS #{ReplayedIds.table_name} (event_id bigint NOT NULL, CONSTRAINT event_id_pk PRIMARY KEY(event_id))
    SQL
  end
end

#create_view_tablesObject

Utility method that creates all tables in the view schema

This method is mainly useful in test scenario to just create the entire view schema without replaying the events



122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
# File 'lib/sequent/migrations/view_schema.rb', line 122

def create_view_tables
  create_view_schema_if_not_exists
  return if Sequent.migration_class == Sequent::Migrations::Projectors
  return if Sequent.new_version == current_version

  in_view_schema do
    Sequent::Core::Migratable.all.flat_map(&:managed_tables).each do |table|
      sql_file = "#{Sequent.configuration.migration_sql_files_directory}/#{table.table_name}.sql"
      statements = sql_file_to_statements(sql_file) do |raw_sql|
        raw_sql.remove('%SUFFIX%')
      end
      statements.each { |statement| exec_sql(statement) }

      indexes_file_name = "#{Sequent.configuration.migration_sql_files_directory}/#{table.table_name}.indexes.sql"
      if File.exist?(indexes_file_name)
        statements = sql_file_to_statements(indexes_file_name) { |raw_sql| raw_sql.remove('%SUFFIX%') }
        statements.each(&method(:exec_sql))
      end
    end
    Versions.create!(version: Sequent.new_version)
  end
end

#current_versionObject

Returns the current version from the database



113
114
115
# File 'lib/sequent/migrations/view_schema.rb', line 113

def current_version
  Versions.order('version desc').limit(1).first&.version || 0
end

#executorObject



177
178
179
# File 'lib/sequent/migrations/view_schema.rb', line 177

def executor
  @executor ||= Executor.new
end

#migrate_offlineObject

Last part of a view schema migration

You have to ensure no events are being added to the event store while this method is running. For instance put your application in maintenance mode.

The offline part consists of:

  1. Replay all events not yet replayed since #migration_online

  2. Within a single transaction do:

2.1 Rename current tables with the current version as SUFFIX 2.2 Rename the new tables and remove the new version suffix 2.3 Add the new version in the Versions table

  1. Performs cleanup of replayed event ids

If anything fails an exception is raised and everything is rolled back

When this method succeeds you can safely start the application from Sequent’s point of view.



241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
# File 'lib/sequent/migrations/view_schema.rb', line 241

def migrate_offline
  return if Sequent.new_version == current_version

  ensure_version_correct!

  executor.set_table_names_to_new_version(plan)

  # 1 replay events not yet replayed
  if plan.projectors.any?
    replay!(
      Sequent.configuration.offline_replay_persistor_class.new,
      exclude_ids: true,
      group_exponent: 1,
    )
  end

  in_view_schema do
    Sequent::ApplicationRecord.transaction do
      # 2.1, 2.2
      executor.execute_offline(plan, current_version)
      # 2.3 Create migration record
      Versions.create!(version: Sequent.new_version)
    end

    # 3. Truncate replayed ids
    truncate_replay_ids_table!
  end
  logger.info "Migrated to version #{Sequent.new_version}"
  # rubocop:disable Lint/RescueException
rescue Exception => e
  # rubocop:enable Lint/RescueException
  rollback_migration
  raise e
end

#migrate_onlineObject

First part of a view schema migration

Call this method while your application is running. The online part consists of:

  1. Ensure any previous migrations are cleaned up

  2. Create new tables for the Projectors which need to be migrated to the new version

These tables will be called `table_name_VERSION`.
  1. Replay all events to populate the tables

It keeps track of all events that are already replayed.
  1. Resets the table names of the activerecord models (projections)

back to their original values (so without the VERSION suffix)

If anything fails an exception is raised and everything is rolled back



197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
# File 'lib/sequent/migrations/view_schema.rb', line 197

def migrate_online
  return if Sequent.new_version == current_version

  ensure_version_correct!

  in_view_schema do
    truncate_replay_ids_table!

    drop_old_tables(Sequent.new_version)
    executor.execute_online(plan)
  end

  replay!(Sequent.configuration.online_replay_persistor_class.new) if plan.projectors.any?

  in_view_schema do
    executor.create_indexes_after_execute_online(plan)
  end
  executor.reset_table_names(plan)
  # rubocop:disable Lint/RescueException
rescue Exception => e
  # rubocop:enable Lint/RescueException
  rollback_migration
  raise e
end

#planObject



173
174
175
# File 'lib/sequent/migrations/view_schema.rb', line 173

def plan
  @plan ||= Planner.new(Sequent.migration_class.versions).plan(current_version, Sequent.new_version)
end

#replay_all!(group_exponent: 1) ⇒ Object

Utility method that replays events for all managed_tables from all Sequent::Core::Projector’s

This method is mainly useful in test scenario’s or development tasks



149
150
151
152
153
154
155
# File 'lib/sequent/migrations/view_schema.rb', line 149

def replay_all!(group_exponent: 1)
  replay!(
    Sequent.configuration.online_replay_persistor_class.new,
    projectors: Core::Migratable.projectors,
    group_exponent: group_exponent,
  )
end