Class: Sequent::Migrations::ViewSchema

Inherits:
Object
  • Object
show all
Includes:
Sql, Util::Printer, Util::Timer
Defined in:
lib/sequent/migrations/view_schema.rb

Overview

ViewSchema is used for migration of you view_schema. For instance when you create new Projectors or change existing Projectors.

The following migrations are supported:

  • ReplayTable (Projector migrations)

  • AlterTable (For instance if you introduce a new column)

To maintain your migrations you need to:

  1. Create a class that extends ‘Sequent::Migrations::Projectors` and specify in `Sequent.configuration.migrations_class_name`

  2. Define per version which migrations you want to execute See the definition of ‘Sequent::Migrations::Projectors.versions` and `Sequent::Migrations::Projectors.version`

  3. Specify in Sequent where your sql files reside (Sequent.configuration.migration_sql_files_directory)

  4. Ensure that you add %SUFFIX% to each name that needs to be unique in postgres (like TABLE names, INDEX names, PRIMARY KEYS) E.g. ‘create table foo%SUFFIX% (id serial NOT NULL, CONSTRAINT foo_pkey%SUFFIX% PRIMARY KEY (id))`

  5. If you want to run an ‘alter_table` migration ensure that

a sql file named `table_name_VERSION.sql` exists.

Example:

class AppMigrations < Sequent::Migrations::Projectors

def self.version
  '3'
end

def self.versions
  {
    '1' => [Sequent.all_projectors],
    '2' => [
      UserProjector,
      InvoiceProjector,
    ],
    '3' => [
      Sequent::Migrations.alter_table(UserRecord)
    ]

  }
end

end

Direct Known Subclasses

DryRun::ViewSchema

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Methods included from Sql

#exec_sql, #sql_file_to_statements

Methods included from Util::Printer

#recursively_print

Methods included from Util::Timer

#time

Constructor Details

#initialize(db_config:) ⇒ ViewSchema

Returns a new instance of ViewSchema.


93
94
95
96
97
# File 'lib/sequent/migrations/view_schema.rb', line 93

def initialize(db_config:)
  @db_config = db_config
  @view_schema = Sequent.configuration.view_schema_name
  @logger = Sequent.logger
end

Instance Attribute Details

#db_configObject (readonly)

Returns the value of attribute db_config.


68
69
70
# File 'lib/sequent/migrations/view_schema.rb', line 68

def db_config
  @db_config
end

#loggerObject (readonly)

Returns the value of attribute logger.


68
69
70
# File 'lib/sequent/migrations/view_schema.rb', line 68

def logger
  @logger
end

#view_schemaObject (readonly)

Returns the value of attribute view_schema.


68
69
70
# File 'lib/sequent/migrations/view_schema.rb', line 68

def view_schema
  @view_schema
end

Class Method Details

.create_view_schema_if_not_exists(env:) ⇒ Object

Parameters:

  • env (String)

    The environment used for connecting the database

See Also:


83
84
85
86
87
88
89
90
# File 'lib/sequent/migrations/view_schema.rb', line 83

def create_view_schema_if_not_exists(env:)
  fail ArgumentError, 'env is required' if env.blank?

  db_config = Sequent::Support::Database.read_config(env)
  Sequent::Support::Database.establish_connection(db_config)

  new(db_config: db_config).create_view_schema_if_not_exists
end

.create_view_tables(env:) ⇒ Object

Parameters:

  • env (String)

    The environment used for connecting the database

See Also:


73
74
75
76
77
78
79
# File 'lib/sequent/migrations/view_schema.rb', line 73

def create_view_tables(env:)
  fail ArgumentError, 'env is required' if env.blank?

  db_config = Sequent::Support::Database.read_config(env)
  Sequent::Support::Database.establish_connection(db_config)
  new(db_config: db_config).create_view_tables
end

Instance Method Details

#create_view_schema_if_not_existsObject

Utility method that creates the view_schema and the meta data tables

This method is mainly useful during an initial setup of the view schema


148
149
150
151
# File 'lib/sequent/migrations/view_schema.rb', line 148

def create_view_schema_if_not_exists
  exec_sql(%(CREATE SCHEMA IF NOT EXISTS #{view_schema}))
  
end

#create_view_tablesObject

Utility method that creates all tables in the view schema

This method is mainly useful in test scenario to just create the entire view schema without replaying the events


110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
# File 'lib/sequent/migrations/view_schema.rb', line 110

def create_view_tables
  create_view_schema_if_not_exists
  return if Sequent.migration_class == Sequent::Migrations::Projectors
  return if Sequent.new_version == current_version

  in_view_schema do
    Sequent::Core::Migratable.all.flat_map(&:managed_tables).each do |table|
      sql_file = "#{Sequent.configuration.migration_sql_files_directory}/#{table.table_name}.sql"
      statements = sql_file_to_statements(sql_file) do |raw_sql|
        raw_sql.remove('%SUFFIX%')
      end
      statements.each { |statement| exec_sql(statement) }

      indexes_file_name = "#{Sequent.configuration.migration_sql_files_directory}/#{table.table_name}.indexes.sql"
      if File.exist?(indexes_file_name)
        statements = sql_file_to_statements(indexes_file_name) { |raw_sql| raw_sql.remove('%SUFFIX%') }
        statements.each(&method(:exec_sql))
      end
    end
    Versions.create!(version: Sequent.new_version)
  end
end

#current_versionObject

Returns the current version from the database


101
102
103
# File 'lib/sequent/migrations/view_schema.rb', line 101

def current_version
  Versions.current_version
end

#executorObject


157
158
159
# File 'lib/sequent/migrations/view_schema.rb', line 157

def executor
  @executor ||= Executor.new
end

#migrate_offlineObject

Last part of a view schema migration

You have to ensure no events are being added to the event store while this method is running. For instance put your application in maintenance mode.

The offline part consists of:

  1. Replay all events not yet replayed since #migration_online

  2. Within a single transaction do:

2.1 Rename current tables with the current version as SUFFIX 2.2 Rename the new tables and remove the new version suffix 2.3 Add the new version in the Versions table

  1. Update the versions table to complete the migration

If anything fails an exception is raised and everything is rolled back

When this method succeeds you can safely start the application from Sequent’s point of view.


236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
# File 'lib/sequent/migrations/view_schema.rb', line 236

def migrate_offline
  return if Sequent.new_version == current_version

  ensure_version_correct!
  in_view_schema { Versions.start_offline!(Sequent.new_version) }
  Sequent.logger.info("Start migrate_offline for version #{Sequent.new_version}")

  executor.set_table_names_to_new_version(plan)

  # 1 replay events not yet replayed
  if plan.projectors.any?
    replay!(
      Sequent.configuration.offline_replay_persistor_class.new,
      minimum_xact_id_inclusive: Versions.running.first.xmin_xact_id,
    )
  end

  in_view_schema do
    Sequent::ApplicationRecord.transaction do
      # 2.1, 2.2
      executor.execute_offline(plan, current_version)
      # 2.3 Create migration record
      Versions.end_offline!(Sequent.new_version)
    end
  end
  logger.info "Migrated to version #{Sequent.new_version}"
rescue ConcurrentMigration
  raise
rescue MigrationDone
  # no-op same as Sequent.new_version == current_version
rescue Exception => e # rubocop:disable Lint/RescueException
  rollback_migration
  raise e
end

#migrate_onlineObject

First part of a view schema migration

Call this method while your application is running. The online part consists of:

  1. Ensure any previous migrations are cleaned up

  2. Create new tables for the Projectors which need to be migrated to the new version

These tables will be called `table_name_VERSION`.
  1. Replay all events to populate the tables

It keeps track of all events that are already replayed.
  1. Resets the table names of the activerecord models (projections)

back to their original values (so without the VERSION suffix)

If anything fails an exception is raised and everything is rolled back

Raises:

  • ConcurrentMigrationError if migration is already running


178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
# File 'lib/sequent/migrations/view_schema.rb', line 178

def migrate_online
  
  ensure_valid_plan!

  return if Sequent.new_version == current_version

  ensure_version_correct!

  Sequent.logger.info("Start migrate_online for version #{Sequent.new_version}")

  in_view_schema do
    Versions.start_online!(Sequent.new_version)

    drop_old_tables(Sequent.new_version)
    executor.execute_online(plan)
  end

  if plan.projectors.any?
    replay!(
      Sequent.configuration.online_replay_persistor_class.new,
      maximum_xact_id_exclusive: Versions.running.first.xmin_xact_id,
    )
  end

  in_view_schema do
    executor.create_indexes_after_execute_online(plan)
    executor.reset_table_names(plan)
    Versions.end_online!(Sequent.new_version)
  end
  Sequent.logger.info("Done migrate_online for version #{Sequent.new_version}")
rescue ConcurrentMigration, InvalidMigrationDefinition
  # ConcurrentMigration: Do not rollback the migration when this is a concurrent migration
  #                      as the other one is running
  # InvalidMigrationDefinition: Do not rollback the migration when since there is nothing to rollback
  raise
rescue Exception => e # rubocop:disable Lint/RescueException
  rollback_migration
  raise e
end

#planObject


153
154
155
# File 'lib/sequent/migrations/view_schema.rb', line 153

def plan
  @plan ||= Planner.new(Sequent.migration_class.versions).plan(current_version, Sequent.new_version)
end

#replay_all!Object

Utility method that replays events for all managed_tables from all Sequent::Core::Projector’s

This method is mainly useful in test scenario’s or development tasks


137
138
139
140
141
142
# File 'lib/sequent/migrations/view_schema.rb', line 137

def replay_all!
  replay!(
    Sequent.configuration.online_replay_persistor_class.new,
    projectors: Core::Migratable.projectors,
  )
end