Class: Sequent::Migrations::ViewSchema
- Inherits:
-
Object
- Object
- Sequent::Migrations::ViewSchema
- Includes:
- Sql, Util::Printer, Util::Timer
- Defined in:
- lib/sequent/migrations/view_schema.rb
Overview
ViewSchema is used for migration of you view_schema. For instance when you create new Projectors or change existing Projectors.
The following migrations are supported:
-
ReplayTable (Projector migrations)
-
AlterTable (For instance if you introduce a new column)
To maintain your migrations you need to:
-
Create a class that extends ‘Sequent::Migrations::Projectors` and specify in `Sequent.configuration.migrations_class_name`
-
Define per version which migrations you want to execute See the definition of ‘Sequent::Migrations::Projectors.versions` and `Sequent::Migrations::Projectors.version`
-
Specify in Sequent where your sql files reside (Sequent.configuration.migration_sql_files_directory)
-
Ensure that you add %SUFFIX% to each name that needs to be unique in postgres (like TABLE names, INDEX names, PRIMARY KEYS) E.g. ‘create table foo%SUFFIX% (id serial NOT NULL, CONSTRAINT foo_pkey%SUFFIX% PRIMARY KEY (id))`
-
If you want to run an ‘alter_table` migration ensure that
a sql file named `table_name_VERSION.sql` exists.
Example:
class AppMigrations < Sequent::Migrations::Projectors
def self.version
'3'
end
def self.versions
{
'1' => [Sequent.all_projectors],
'2' => [
UserProjector,
InvoiceProjector,
],
'3' => [
Sequent::Migrations.alter_table(UserRecord)
]
}
end
end
Defined Under Namespace
Classes: ReplayedIds, Versions
Constant Summary collapse
- LENGTH_OF_SUBSTRING_INDEX_ON_AGGREGATE_ID_IN_EVENT_STORE =
Corresponds with the index on aggregate_id column in the event_records table
Since we replay in batches of the first 3 chars of the uuid we created an index on these 3 characters. Hence the name ;-)
This also means that the online replay is divided up into 16**3 groups This might seem a lot for starting event store, but when you will get more events, you will see that this is pretty good partitioned.
3
Instance Attribute Summary collapse
-
#db_config ⇒ Object
readonly
Returns the value of attribute db_config.
-
#logger ⇒ Object
readonly
Returns the value of attribute logger.
-
#view_schema ⇒ Object
readonly
Returns the value of attribute view_schema.
Instance Method Summary collapse
-
#create_view_schema_if_not_exists ⇒ Object
Utility method that creates the view_schema and the meta data tables.
-
#create_view_tables ⇒ Object
Utility method that creates all tables in the view schema.
-
#current_version ⇒ Object
Returns the current version from the database.
- #executor ⇒ Object
-
#initialize(db_config:) ⇒ ViewSchema
constructor
A new instance of ViewSchema.
-
#migrate_offline ⇒ Object
Last part of a view schema migration.
-
#migrate_online ⇒ Object
First part of a view schema migration.
- #plan ⇒ Object
-
#replay_all! ⇒ Object
Utility method that replays events for all managed_tables from all Sequent::Core::Projector’s.
Methods included from Sql
#exec_sql, #sql_file_to_statements
Methods included from Util::Printer
Methods included from Util::Timer
Constructor Details
#initialize(db_config:) ⇒ ViewSchema
Returns a new instance of ViewSchema.
82 83 84 85 86 |
# File 'lib/sequent/migrations/view_schema.rb', line 82 def initialize(db_config:) @db_config = db_config @view_schema = Sequent.configuration.view_schema_name @logger = Sequent.logger end |
Instance Attribute Details
#db_config ⇒ Object (readonly)
Returns the value of attribute db_config.
80 81 82 |
# File 'lib/sequent/migrations/view_schema.rb', line 80 def db_config @db_config end |
#logger ⇒ Object (readonly)
Returns the value of attribute logger.
80 81 82 |
# File 'lib/sequent/migrations/view_schema.rb', line 80 def logger @logger end |
#view_schema ⇒ Object (readonly)
Returns the value of attribute view_schema.
80 81 82 |
# File 'lib/sequent/migrations/view_schema.rb', line 80 def view_schema @view_schema end |
Instance Method Details
#create_view_schema_if_not_exists ⇒ Object
Utility method that creates the view_schema and the meta data tables
This method is mainly useful during an initial setup of the view schema
130 131 132 133 134 135 136 137 138 139 140 |
# File 'lib/sequent/migrations/view_schema.rb', line 130 def create_view_schema_if_not_exists exec_sql(%(CREATE SCHEMA IF NOT EXISTS #{view_schema})) in_view_schema do exec_sql(<<~SQL.chomp) CREATE TABLE IF NOT EXISTS #{Versions.table_name} (version integer NOT NULL, CONSTRAINT version_pk PRIMARY KEY(version)) SQL exec_sql(<<~SQL.chomp) CREATE TABLE IF NOT EXISTS #{ReplayedIds.table_name} (event_id bigint NOT NULL, CONSTRAINT event_id_pk PRIMARY KEY(event_id)) SQL end end |
#create_view_tables ⇒ Object
Utility method that creates all tables in the view schema
This method is mainly useful in test scenario to just create the entire view schema without replaying the events
99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 |
# File 'lib/sequent/migrations/view_schema.rb', line 99 def create_view_tables create_view_schema_if_not_exists in_view_schema do Sequent::Core::Migratable.all.flat_map(&:managed_tables).each do |table| sql_file = "#{Sequent.configuration.migration_sql_files_directory}/#{table.table_name}.sql" statements = sql_file_to_statements(sql_file) do |raw_sql| raw_sql.remove('%SUFFIX%') end statements.each { |statement| exec_sql(statement) } indexes_file_name = "#{Sequent.configuration.migration_sql_files_directory}/#{table.table_name}.indexes.sql" if File.exist?(indexes_file_name) statements = sql_file_to_statements(indexes_file_name) { |raw_sql| raw_sql.remove('%SUFFIX%') } statements.each(&method(:exec_sql)) end end end end |
#current_version ⇒ Object
Returns the current version from the database
90 91 92 |
# File 'lib/sequent/migrations/view_schema.rb', line 90 def current_version Versions.order('version desc').limit(1).first&.version || 0 end |
#executor ⇒ Object
146 147 148 |
# File 'lib/sequent/migrations/view_schema.rb', line 146 def executor @executor ||= Executor.new end |
#migrate_offline ⇒ Object
Last part of a view schema migration
You have to ensure no events are being added to the event store while this method is running. For instance put your application in maintenance mode.
The offline part consists of:
-
Replay all events not yet replayed since #migration_online
-
Within a single transaction do:
2.1 Rename current tables with the current version as SUFFIX 2.2 Rename the new tables and remove the new version suffix 2.3 Add the new version in the Versions
table
-
Performs cleanup of replayed event ids
If anything fails an exception is raised and everything is rolled back
When this method succeeds you can safely start the application from Sequent’s point of view.
207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 |
# File 'lib/sequent/migrations/view_schema.rb', line 207 def migrate_offline return if Sequent.new_version == current_version ensure_version_correct! executor.set_table_names_to_new_version(plan) # 1 replay events not yet replayed if plan.projectors.any? replay!( Sequent.configuration.offline_replay_persistor_class.new, exclude_ids: true, group_exponent: 1, ) end in_view_schema do Sequent::ApplicationRecord.transaction do # 2.1, 2.2 executor.execute_offline(plan, current_version) # 2.3 Create migration record Versions.create!(version: Sequent.new_version) end # 3. Truncate replayed ids truncate_replay_ids_table! end logger.info "Migrated to version #{Sequent.new_version}" # rubocop:disable Lint/RescueException rescue Exception => e # rubocop:enable Lint/RescueException rollback_migration raise e end |
#migrate_online ⇒ Object
First part of a view schema migration
Call this method while your application is running. The online part consists of:
-
Ensure any previous migrations are cleaned up
-
Create new tables for the Projectors which need to be migrated to the new version
These tables will be called `table_name_VERSION`.
-
Replay all events to populate the tables
It keeps track of all events that are already replayed.
If anything fails an exception is raised and everything is rolled back
164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 |
# File 'lib/sequent/migrations/view_schema.rb', line 164 def migrate_online return if Sequent.new_version == current_version ensure_version_correct! in_view_schema do truncate_replay_ids_table! drop_old_tables(Sequent.new_version) executor.execute_online(plan) end replay!(Sequent.configuration.online_replay_persistor_class.new) if plan.projectors.any? in_view_schema do executor.create_indexes_after_execute_online(plan) end # rubocop:disable Lint/RescueException rescue Exception => e # rubocop:enable Lint/RescueException rollback_migration raise e end |
#plan ⇒ Object
142 143 144 |
# File 'lib/sequent/migrations/view_schema.rb', line 142 def plan @plan ||= Planner.new(Sequent.migration_class.versions).plan(current_version, Sequent.new_version) end |
#replay_all! ⇒ Object
Utility method that replays events for all managed_tables from all Sequent::Core::Projector’s
This method is mainly useful in test scenario’s or development tasks
122 123 124 |
# File 'lib/sequent/migrations/view_schema.rb', line 122 def replay_all! replay!(Sequent.configuration.online_replay_persistor_class.new) end |