Class: ChronoModel::Adapter

Inherits:
ActiveRecord::ConnectionAdapters::PostgreSQLAdapter
  • Object
show all
Defined in:
lib/chrono_model/adapter.rb

Overview

This class implements all ActiveRecord::ConnectionAdapters::SchemaStatements methods adding support for temporal extensions. It inherits from the Postgres adapter for a clean override of its methods using super.

Defined Under Namespace

Classes: TSRange

Constant Summary collapse

TEMPORAL_SCHEMA =

The schema holding current data

'temporal'
HISTORY_SCHEMA =

The schema holding historical data

'history'
RANGE_TYPE =

This is the data type used for the SCD2 validity

'tsrange'

Instance Method Summary collapse

Instance Method Details

#add_column(table_name) ⇒ Object

If adding a column to a temporal table, creates it in the table in the temporal schema and updates the triggers.


231
232
233
234
235
236
237
238
239
240
241
# File 'lib/chrono_model/adapter.rb', line 231

def add_column(table_name, *)
  return super unless is_chrono?(table_name)

  transaction do
    # Add the column to the temporal table
    _on_temporal_schema { super }

    # Update the triggers
    chrono_create_view_for(table_name)
  end
end

#add_index(table_name, column_name, options = {}) ⇒ Object

If adding an index to a temporal table, add it to the one in the temporal schema and to the history one. If the `:unique` option is present, it is removed from the index created in the history table.


203
204
205
206
207
208
209
210
211
212
213
214
# File 'lib/chrono_model/adapter.rb', line 203

def add_index(table_name, column_name, options = {})
  return super unless is_chrono?(table_name)

  transaction do
    _on_temporal_schema { super }

    # Uniqueness constraints do not make sense in the history table
    options = options.dup.tap {|o| o.delete(:unique)} if options[:unique].present?

    _on_history_schema { super table_name, column_name, options }
  end
end

#add_temporal_indexes(table, range, options = {}) ⇒ Object

Create spatial indexes for timestamp search.

This index is used by TimeMachine.at, `.current` and `.past` to build the temporal WHERE clauses that fetch the state of records at a single point in time.

Parameters:

`table`: the table where to create indexes on
`range`: the tsrange field

Options:

`:name`: the index name prefix, defaults to
         index_{table}_temporal_on_{range / lower_range / upper_range}

336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
# File 'lib/chrono_model/adapter.rb', line 336

def add_temporal_indexes(table, range, options = {})
  range_idx, lower_idx, upper_idx =
    temporal_index_names(table, range, options)

  chrono_alter_index(table, options) do
    execute <<-SQL
      CREATE INDEX #{range_idx} ON #{table} USING gist ( #{range} )
    SQL

    # Indexes used for precise history filtering, sorting and, in history
    # tables, by UPDATE / DELETE triggers.
    #
    execute "CREATE INDEX #{lower_idx} ON #{table} ( lower(#{range}) )"
    execute "CREATE INDEX #{upper_idx} ON #{table} ( upper(#{range}) )"
  end
end

#add_timeline_consistency_constraint(table, range, options = {}) ⇒ Object

Adds an EXCLUDE constraint to the given table, to assure that no more than one record can occupy a definite segment on a timeline.


378
379
380
381
382
383
384
385
386
387
388
# File 'lib/chrono_model/adapter.rb', line 378

def add_timeline_consistency_constraint(table, range, options = {})
  name = timeline_consistency_constraint_name(table)
  id   = options[:id] || primary_key(table)

  chrono_alter_constraint(table, options) do
    execute <<-SQL
      ALTER TABLE #{table} ADD CONSTRAINT #{name}
        EXCLUDE USING gist ( #{id} WITH =, #{range} WITH && )
    SQL
  end
end

#change_column(table_name) ⇒ Object

If removing a column from a temporal table, we are forced to drop the view, then change the column from the table in the temporal schema and eventually recreate the triggers.


263
264
265
266
# File 'lib/chrono_model/adapter.rb', line 263

def change_column(table_name, *)
  return super unless is_chrono?(table_name)
  chrono_alter(table_name) { super }
end

#change_column_default(table_name) ⇒ Object

Change the default on the temporal schema table.


270
271
272
273
# File 'lib/chrono_model/adapter.rb', line 270

def change_column_default(table_name, *)
  return super unless is_chrono?(table_name)
  _on_temporal_schema { super }
end

#change_column_null(table_name) ⇒ Object

Change the null constraint on the temporal schema table.


277
278
279
280
# File 'lib/chrono_model/adapter.rb', line 277

def change_column_null(table_name, *)
  return super unless is_chrono?(table_name)
  _on_temporal_schema { super }
end

#change_table(table_name, options = {}, &block) ⇒ Object

If changing a temporal table, redirect the change to the table in the temporal schema and recreate views.

If the `:temporal` option is specified, enables or disables temporal features on the given table. Please note that you'll lose your history when demoting a temporal table to a plain one.


124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
# File 'lib/chrono_model/adapter.rb', line 124

def change_table(table_name, options = {}, &block)
  transaction do

    # Add an empty proc to support calling change_table without a block.
    #
    block ||= proc { }

    if options[:temporal] == true
      if !is_chrono?(table_name)
        # Add temporal features to this table
        #
        if !primary_key(table_name)
          execute "ALTER TABLE #{table_name} ADD __chrono_id SERIAL PRIMARY KEY"
        end

        execute "ALTER TABLE #{table_name} SET SCHEMA #{TEMPORAL_SCHEMA}"
        _on_history_schema { chrono_create_history_for(table_name) }
        chrono_create_view_for(table_name, options)
        copy_indexes_to_history_for(table_name)

        # Optionally copy the plain table data, setting up history
        # retroactively.
        #
        if options[:copy_data]
          seq  = _on_history_schema { serial_sequence(table_name, primary_key(table_name)) }
          from = options[:validity] || '0001-01-01 00:00:00'

          execute %[
            INSERT INTO #{HISTORY_SCHEMA}.#{table_name}
            SELECT *,
              nextval('#{seq}')        AS hid,
              tsrange('#{from}', NULL) AS validity,
              timezone('UTC', now())   AS recorded_at
            FROM #{TEMPORAL_SCHEMA}.#{table_name}
          ]
        end
      end

      chrono_alter(table_name, options) { super table_name, options, &block }
    else
      if options[:temporal] == false && is_chrono?(table_name)
        # Remove temporal features from this table
        #
        execute "DROP VIEW #{table_name}"

        chrono_drop_trigger_functions_for(table_name)

        _on_history_schema { execute "DROP TABLE #{table_name}" }

        default_schema = select_value 'SELECT current_schema()'
        _on_temporal_schema do
          if primary_key(table_name) == '__chrono_id'
            execute "ALTER TABLE #{table_name} DROP __chrono_id"
          end

          execute "ALTER TABLE #{table_name} SET SCHEMA #{default_schema}"
        end
      end

      super table_name, options, &block
    end
  end
end

#chrono_setup!Object


464
465
466
467
468
# File 'lib/chrono_model/adapter.rb', line 464

def chrono_setup!
  chrono_create_schemas

  chrono_upgrade_structure!
end

#chrono_supported?Boolean

Returns true whether the connection adapter supports our implementation of temporal tables. Currently, Chronomodel is supported starting with PostgreSQL 9.3.

Returns:

  • (Boolean)

26
27
28
# File 'lib/chrono_model/adapter.rb', line 26

def chrono_supported?
  postgresql_version >= 90300
end

#copy_indexes_to_history_for(table_name) ⇒ Object

Copy the indexes from the temporal table to the history table if the indexes are not already created with the same name.

Uniqueness is voluntarily ignored, as it doesn't make sense on history tables.

Ref: GitHub pull #21.


525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
# File 'lib/chrono_model/adapter.rb', line 525

def copy_indexes_to_history_for(table_name)
  history_indexes  = _on_history_schema  { indexes(table_name) }.map(&:name)
  temporal_indexes = _on_temporal_schema { indexes(table_name) }

  temporal_indexes.each do |index|
    next if history_indexes.include?(index.name)

    _on_history_schema do
      execute %[
        CREATE INDEX #{index.name} ON #{table_name}
        USING #{index.using} ( #{index.columns.join(', ')} )
      ], 'Copy index from temporal to history'
    end
  end
end

#create_table(table_name, options = {}) ⇒ Object

Creates the given table, possibly creating the temporal schema objects if the `:temporal` option is given and set to true.


33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
# File 'lib/chrono_model/adapter.rb', line 33

def create_table(table_name, options = {})
  # No temporal features requested, skip
  return super unless options[:temporal]

  if options[:id] == false
    logger.warn "WARNING - Temporal Temporal tables require a primary key."
    logger.warn "WARNING - Creating a \"__chrono_id\" primary key to fulfill the requirement"

    options[:id] = '__chrono_id'
  end

  transaction do
    _on_temporal_schema { super }
    _on_history_schema { chrono_create_history_for(table_name) }

    chrono_create_view_for(table_name, options)
  end
end

#drop_table(table_name) ⇒ Object

If dropping a temporal table, drops it from the temporal schema adding the CASCADE option so to delete the history, view and triggers.


191
192
193
194
195
196
197
# File 'lib/chrono_model/adapter.rb', line 191

def drop_table(table_name, *)
  return super unless is_chrono?(table_name)

  _on_temporal_schema { execute "DROP TABLE #{table_name} CASCADE" }

  chrono_drop_trigger_functions_for(table_name)
end

#initialize_type_map(type_map) ⇒ Object


508
509
510
511
512
513
514
515
# File 'lib/chrono_model/adapter.rb', line 508

def initialize_type_map(type_map)
  super.tap do
    ar_type = type_map.fetch(TSRange::OID)
    cm_type = TSRange.new(ar_type.subtype, ar_type.type)

    type_map.register_type TSRange::OID, cm_type
  end
end

#is_chrono?(table) ⇒ Boolean

Returns true if the given name references a temporal table.

Returns:

  • (Boolean)

442
443
444
445
446
447
448
449
450
451
452
453
454
# File 'lib/chrono_model/adapter.rb', line 442

def is_chrono?(table)
  _on_temporal_schema { chrono_data_source_exists?(table) } &&
    _on_history_schema { chrono_data_source_exists?(table) }

rescue ActiveRecord::StatementInvalid => e
  # means that we could not change the search path to check for
  # table existence
  if is_exception_class?(e, PG::InvalidSchemaName, PG::InvalidParameterValue)
    return false
  else
    raise e
  end
end

#is_exception_class?(e, *klasses) ⇒ Boolean

Returns:

  • (Boolean)

456
457
458
459
460
461
462
# File 'lib/chrono_model/adapter.rb', line 456

def is_exception_class?(e, *klasses)
  if e.respond_to?(:original_exception)
    klasses.any? { |k| e.is_a?(k) }
  else
    klasses.any? { |k| e.message =~ /#{k.name}/ }
  end
end

#on_schema(schema, nesting = true, &block) ⇒ Object

Evaluates the given block in the given schema search path.

By default, nested call are allowed, to disable this feature pass false as the second parameter.


410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
# File 'lib/chrono_model/adapter.rb', line 410

def on_schema(schema, nesting = true, &block)
  @_on_schema_nesting = (@_on_schema_nesting || 0) + 1

  if nesting || @_on_schema_nesting == 1
    old_path = self.schema_search_path
    self.schema_search_path = schema
  end

  block.call

ensure
  if (nesting || @_on_schema_nesting == 1)

    # If the transaction is aborted, any execute() call will raise
    # "transaction is aborted errors" - thus calling the Adapter's
    # setter won't update the memoized variable.
    #
    # Here we reset it to +nil+ to refresh it on the next call, as
    # there is no way to know which path will be restored when the
    # transaction ends.
    #
    if @connection.transaction_status == PG::Connection::PQTRANS_INERROR
      @schema_search_path = nil
    else
      self.schema_search_path = old_path
    end
  end
  @_on_schema_nesting -= 1
end

#remove_column(table_name) ⇒ Object

If removing a column from a temporal table, we are forced to drop the view, then drop the column from the table in the temporal schema and eventually recreate the triggers.


286
287
288
289
# File 'lib/chrono_model/adapter.rb', line 286

def remove_column(table_name, *)
  return super unless is_chrono?(table_name)
  chrono_alter(table_name) { super }
end

#remove_index(table_name) ⇒ Object

If removing an index from a temporal table, remove it both from the temporal and the history schemas.


219
220
221
222
223
224
225
226
# File 'lib/chrono_model/adapter.rb', line 219

def remove_index(table_name, *)
  return super unless is_chrono?(table_name)

  transaction do
    _on_temporal_schema { super }
    _on_history_schema { super }
  end
end

#remove_temporal_indexes(table, range, options = {}) ⇒ Object


353
354
355
356
357
358
359
# File 'lib/chrono_model/adapter.rb', line 353

def remove_temporal_indexes(table, range, options = {})
  indexes = temporal_index_names(table, range, options)

  chrono_alter_index(table, options) do
    indexes.each {|idx| execute "DROP INDEX #{idx}" }
  end
end

#remove_timeline_consistency_constraint(table, options = {}) ⇒ Object


390
391
392
393
394
395
396
397
398
# File 'lib/chrono_model/adapter.rb', line 390

def remove_timeline_consistency_constraint(table, options = {})
  name = timeline_consistency_constraint_name(options[:prefix] || table)

  chrono_alter_constraint(table, options) do
    execute <<-SQL
      ALTER TABLE #{table} DROP CONSTRAINT #{name}
    SQL
  end
end

#rename_column(table_name) ⇒ Object

If renaming a column of a temporal table, rename it in the table in the temporal schema and update the triggers.


246
247
248
249
250
251
252
253
254
255
256
257
# File 'lib/chrono_model/adapter.rb', line 246

def rename_column(table_name, *)
  return super unless is_chrono?(table_name)

  # Rename the column in the temporal table and in the view
  transaction do
    _on_temporal_schema { super }
    super

    # Update the triggers
    chrono_create_view_for(table_name)
  end
end

#rename_table(name, new_name) ⇒ Object

If renaming a temporal table, rename the history and view as well.


54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
# File 'lib/chrono_model/adapter.rb', line 54

def rename_table(name, new_name)
  return super unless is_chrono?(name)

  clear_cache!

  transaction do
    # Rename tables
    #
    [TEMPORAL_SCHEMA, HISTORY_SCHEMA].each do |schema|
      on_schema(schema) do
        seq     = serial_sequence(name, primary_key(name))
        new_seq = seq.sub(name.to_s, new_name.to_s).split('.').last

        execute "ALTER SEQUENCE #{seq}  RENAME TO #{new_seq}"
        execute "ALTER TABLE    #{name} RENAME TO #{new_name}"
      end
    end


    # Rename indexes on history schema
    #
    _on_history_schema do
      standard_index_names = %w(
        inherit_pkey instance_history pkey
        recorded_at timeline_consistency )

      old_names = temporal_index_names(name, :validity) +
        standard_index_names.map {|i| [name, i].join('_') }

      new_names = temporal_index_names(new_name, :validity) +
        standard_index_names.map {|i| [new_name, i].join('_') }

      old_names.zip(new_names).each do |old, new|
        execute "ALTER INDEX #{old} RENAME TO #{new}"
      end
    end

    # Rename indexes on temporal schema
    #
    _on_temporal_schema do
      temporal_indexes =  indexes(new_name)
      temporal_indexes.map(&:name).each do |old_idx_name|
        if old_idx_name =~ /^index_#{name}_on_(?<columns>.+)/
          new_idx_name = "index_#{new_name}_on_#{$~['columns']}"
          execute "ALTER INDEX #{old_idx_name} RENAME TO #{new_idx_name}"
        end
      end
    end

    # Drop view
    #
    execute "DROP VIEW #{name}"

    # Drop functions
    #
    chrono_drop_trigger_functions_for(name)

    # Create view and functions
    #
    chrono_create_view_for(new_name)
  end
end

#temporal_index_names(table, range, options = {}) ⇒ Object


361
362
363
364
365
366
367
368
369
370
371
# File 'lib/chrono_model/adapter.rb', line 361

def temporal_index_names(table, range, options = {})
  prefix = options[:name].presence || "index_#{table}_temporal"

  # When creating computed indexes (e.g. ends_on::timestamp + time
  # '23:59:59'), remove everything following the field name.
  range = range.to_s.sub(/\W.*/, '')

  [range, "lower_#{range}", "upper_#{range}"].map do |suffix|
    [prefix, 'on', suffix].join('_')
  end
end

#timeline_consistency_constraint_name(table) ⇒ Object


400
401
402
# File 'lib/chrono_model/adapter.rb', line 400

def timeline_consistency_constraint_name(table)
  "#{table}_timeline_consistency"
end