Class: PgOnlineSchemaChange::Query

Inherits:
Object
  • Object
show all
Extended by:
Helper
Defined in:
lib/pg_online_schema_change/query.rb

Constant Summary collapse

INDEX_SUFFIX =
"_pgosc"
DROPPED_COLUMN_TYPE =
:AT_DropColumn
RENAMED_COLUMN_TYPE =
:AT_RenameColumn
LOCK_ATTEMPT =
4

Class Method Summary collapse

Methods included from Helper

logger, method_missing, primary_key, respond_to_missing?

Class Method Details

.alter_statement?(query) ⇒ Boolean

Returns:

  • (Boolean)


16
17
18
19
20
21
22
23
24
25
26
27
# File 'lib/pg_online_schema_change/query.rb', line 16

def alter_statement?(query)
  PgQuery
    .parse(query)
    .tree
    .stmts
    .all? do |statement|
      statement.stmt.alter_table_stmt.instance_of?(PgQuery::AlterTableStmt) ||
        statement.stmt.rename_stmt.instance_of?(PgQuery::RenameStmt)
    end
rescue PgQuery::ParseError
  false
end

.alter_statement_for(client, shadow_table) ⇒ Object



111
112
113
114
115
116
117
118
119
120
121
122
# File 'lib/pg_online_schema_change/query.rb', line 111

def alter_statement_for(client, shadow_table)
  parsed_query = PgQuery.parse(client.alter_statement)

  parsed_query.tree.stmts.each do |statement|
    if statement.stmt.alter_table_stmt
      statement.stmt.alter_table_stmt.relation.relname = shadow_table
    end

    statement.stmt.rename_stmt.relation.relname = shadow_table if statement.stmt.rename_stmt
  end
  parsed_query.deparse
end

.copy_data_statement(client, shadow_table, reuse_trasaction = false) ⇒ Object



381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
# File 'lib/pg_online_schema_change/query.rb', line 381

def copy_data_statement(client, shadow_table, reuse_trasaction = false)
  select_columns =
    table_columns(client, client.table_name, reuse_trasaction).map do |entry|
      entry["column_name_regular"]
    end

  select_columns -= dropped_columns_list if dropped_columns_list.any?

  insert_into_columns = select_columns.dup

  if renamed_columns_list.any?
    renamed_columns_list.each do |obj|
      insert_into_columns.each_with_index do |insert_into_column, index|
        insert_into_columns[index] = obj[:new_name] if insert_into_column == obj[:old_name]
      end
    end
  end

  insert_into_columns.map! do |insert_into_column|
    client.connection.quote_ident(insert_into_column)
  end

  select_columns.map! { |select_column| client.connection.quote_ident(select_column) }

  <<~SQL
    INSERT INTO #{shadow_table}(#{insert_into_columns.join(", ")})
    SELECT #{select_columns.join(", ")}
    FROM ONLY #{client.table_name}
  SQL
end

.dropped_columns(client) ⇒ Object



245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
# File 'lib/pg_online_schema_change/query.rb', line 245

def dropped_columns(client)
  PgQuery
    .parse(client.alter_statement)
    .tree
    .stmts
    .map do |statement|
      next if statement.stmt.alter_table_stmt.nil?

      statement.stmt.alter_table_stmt.cmds.map do |cmd|
        cmd.alter_table_cmd.name if cmd.alter_table_cmd.subtype == DROPPED_COLUMN_TYPE
      end
    end
    .flatten
    .compact
end

.get_all_constraints_for(client) ⇒ Object



160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
# File 'lib/pg_online_schema_change/query.rb', line 160

def get_all_constraints_for(client)
  query = <<~SQL
    SELECT  conrelid::regclass AS table_on,
            confrelid::regclass AS table_from,
            contype as constraint_type,
            conname AS constraint_name,
            convalidated AS constraint_validated,
            pg_get_constraintdef(oid) AS definition
    FROM   	pg_constraint
    WHERE  	contype IN ('f', 'p')
  SQL

  constraints = []
  run(client.connection, query) { |result| constraints = result.map { |row| row } }

  constraints
end

.get_foreign_keys_for(client, table) ⇒ Object



184
185
186
187
188
# File 'lib/pg_online_schema_change/query.rb', line 184

def get_foreign_keys_for(client, table)
  get_all_constraints_for(client).select do |row|
    row["table_on"] == table && row["constraint_type"] == "f"
  end
end

.get_foreign_keys_to_validate(client, table) ⇒ Object



232
233
234
235
236
237
238
239
240
241
242
243
# File 'lib/pg_online_schema_change/query.rb', line 232

def get_foreign_keys_to_validate(client, table)
  constraints = get_all_constraints_for(client)
  referential_foreign_keys =
    constraints.select { |row| row["table_from"] == table && row["constraint_type"] == "f" }

  self_foreign_keys =
    constraints.select { |row| row["table_on"] == table && row["constraint_type"] == "f" }

  [referential_foreign_keys, self_foreign_keys].flatten.map do |row|
    "ALTER TABLE #{row["table_on"]} VALIDATE CONSTRAINT #{row["constraint_name"]};"
  end
end

.get_indexes_for(client, table) ⇒ Object



124
125
126
127
128
129
130
131
132
133
134
135
# File 'lib/pg_online_schema_change/query.rb', line 124

def get_indexes_for(client, table)
  query = <<~SQL
    SELECT indexdef, schemaname
    FROM pg_indexes
    WHERE schemaname = '#{client.schema}' AND tablename = '#{table}'
  SQL

  indexes = []
  run(client.connection, query) { |result| indexes = result.map { |row| row["indexdef"] } }

  indexes
end

.get_primary_keys_for(client, table) ⇒ Object



178
179
180
181
182
# File 'lib/pg_online_schema_change/query.rb', line 178

def get_primary_keys_for(client, table)
  get_all_constraints_for(client).select do |row|
    row["table_on"] == table && row["constraint_type"] == "p"
  end
end

.get_sequence_name(client, table, column) ⇒ Object

fetches the sequence name of a table and column combination



138
139
140
141
142
143
144
145
146
# File 'lib/pg_online_schema_change/query.rb', line 138

def get_sequence_name(client, table, column)
  query = <<~SQL
    SELECT pg_get_serial_sequence('#{table}', '#{column}');
  SQL

  run(client.connection, query) do |result|
    result.map { |row| row["pg_get_serial_sequence"] }
  end.first
end

.get_table_size(connection, schema, table_name) ⇒ Object



432
433
434
435
436
437
438
439
# File 'lib/pg_online_schema_change/query.rb', line 432

def get_table_size(connection, schema, table_name)
  size_query = "SELECT pg_table_size('#{schema}.#{table_name}');"
  result = run(connection, size_query).first
  result["pg_table_size"].to_i
rescue StandardError => e
  logger.error("Error getting table size: #{e.message}")
  0
end

.get_triggers_for(client, table) ⇒ Object



148
149
150
151
152
153
154
155
156
157
158
# File 'lib/pg_online_schema_change/query.rb', line 148

def get_triggers_for(client, table)
  query = <<~SQL
    SELECT pg_get_triggerdef(oid) as tdef FROM pg_trigger
    WHERE  tgrelid = '#{client.schema}.#{table}'::regclass AND tgisinternal = FALSE;
  SQL

  triggers = []
  run(client.connection, query) { |result| triggers = result.map { |row| "#{row["tdef"]};" } }

  triggers.join(";")
end

.kill_backends(client, table) ⇒ Object



369
370
371
372
373
374
375
376
377
378
379
# File 'lib/pg_online_schema_change/query.rb', line 369

def kill_backends(client, table)
  return unless client.kill_backends

  logger.info("Terminating other backends")

  query = <<~SQL
    SELECT pg_terminate_backend(pid) FROM pg_locks WHERE locktype = 'relation' AND relation = '#{table}'::regclass::oid AND pid <> pg_backend_pid()
  SQL

  run(client.connection, query, true)
end

.open_lock_exclusive(client, table) ⇒ Object

This function acquires the lock and keeps the transaction open. If a lock is acquired, its upon the caller to call COMMIT to end the transaction. If a lock is not acquired, transaction is closed and a new transaction is started to acquire lock again



343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
# File 'lib/pg_online_schema_change/query.rb', line 343

def open_lock_exclusive(client, table)
  attempts ||= 1

  query = <<~SQL
    SET lock_timeout = '#{client.wait_time_for_lock}s';
    LOCK TABLE #{client.table_name} IN ACCESS EXCLUSIVE MODE;
  SQL
  run(client.connection, query, true)

  true
rescue PG::LockNotAvailable, PG::InFailedSqlTransaction
  if (attempts += 1) < LOCK_ATTEMPT
    logger.info("Couldn't acquire lock, attempt: #{attempts}")

    run(client.connection, "RESET lock_timeout;")
    kill_backends(client, table)

    retry
  end

  logger.info("Lock acquire failed")
  run(client.connection, "RESET lock_timeout;")

  false
end

.primary_key_for(client, table) ⇒ Object



278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
# File 'lib/pg_online_schema_change/query.rb', line 278

def primary_key_for(client, table)
  query = <<~SQL
    SELECT
      pg_attribute.attname as column_name
    FROM pg_index, pg_class, pg_attribute, pg_namespace
    WHERE
      pg_class.oid = '#{table}'::regclass AND
      indrelid = pg_class.oid AND
      nspname = '#{client.schema}' AND
      pg_class.relnamespace = pg_namespace.oid AND
      pg_attribute.attrelid = pg_class.oid AND
      pg_attribute.attnum = any(pg_index.indkey)
    AND indisprimary
  SQL

  columns = []
  run(client.connection, query) { |result| columns = result.map { |row| row["column_name"] } }

  columns.first
end

.primary_key_sequence(shadow_table, primary_key, opened) ⇒ Object



412
413
414
415
416
417
418
419
420
# File 'lib/pg_online_schema_change/query.rb', line 412

def primary_key_sequence(shadow_table, primary_key, opened)
  query = <<~SQL
    SELECT pg_get_serial_sequence('#{shadow_table}', '#{primary_key}') as sequence_name
  SQL

  result = run(client.connection, query, opened)

  result.map { |row| row["sequence_name"] }&.first
end

.query_for_primary_key_refresh(shadow_table, primary_key, table, opened) ⇒ Object



422
423
424
425
426
427
428
429
430
# File 'lib/pg_online_schema_change/query.rb', line 422

def query_for_primary_key_refresh(shadow_table, primary_key, table, opened)
  sequence_name = primary_key_sequence(shadow_table, primary_key, opened)

  return "" if sequence_name.nil?

  <<~SQL
    SELECT setval((select pg_get_serial_sequence('#{shadow_table}', '#{primary_key}')), (SELECT max(#{primary_key}) FROM #{table}));
  SQL
end

.referential_foreign_keys_to_refresh(client, table) ⇒ Object



190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
# File 'lib/pg_online_schema_change/query.rb', line 190

def referential_foreign_keys_to_refresh(client, table)
  references =
    get_all_constraints_for(client).select do |row|
      row["table_from"] == table && row["constraint_type"] == "f"
    end

  references
    .map do |row|
      add_statement =
        if row["definition"].end_with?("NOT VALID")
          "ALTER TABLE #{row["table_on"]} ADD CONSTRAINT #{row["constraint_name"]} #{row["definition"]};"
        else
          "ALTER TABLE #{row["table_on"]} ADD CONSTRAINT #{row["constraint_name"]} #{row["definition"]} NOT VALID;"
        end

      drop_statement =
        "ALTER TABLE #{row["table_on"]} DROP CONSTRAINT #{row["constraint_name"]};"

      "#{drop_statement} #{add_statement}"
    end
    .join
end

.renamed_columns(client) ⇒ Object



261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
# File 'lib/pg_online_schema_change/query.rb', line 261

def renamed_columns(client)
  PgQuery
    .parse(client.alter_statement)
    .tree
    .stmts
    .map do |statement|
      next if statement.stmt.rename_stmt.nil?

      {
        old_name: statement.stmt.rename_stmt.subname,
        new_name: statement.stmt.rename_stmt.newname,
      }
    end
    .flatten
    .compact
end

.run(connection, query, reuse_trasaction = false, &block) ⇒ Object

rubocop:disable Style/ArgumentsForwarding



69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
# File 'lib/pg_online_schema_change/query.rb', line 69

def run(connection, query, reuse_trasaction = false, &block) # rubocop:disable Style/ArgumentsForwarding
  if [PG::PQTRANS_INERROR, PG::PQTRANS_UNKNOWN].include?(connection.transaction_status)
    connection.cancel
  end

  logger.debug("Running query", { query: query })

  connection.async_exec("BEGIN;")

  result = connection.async_exec(query, &block) # rubocop:disable Style/ArgumentsForwarding
rescue Exception # rubocop:disable Lint/RescueException
  connection.cancel if connection.transaction_status != PG::PQTRANS_IDLE
  connection.block
  logger.info("Exception raised, rolling back query", { rollback: true, query: query })
  connection.async_exec("ROLLBACK;")
  raise
else
  connection.async_exec("COMMIT;") unless reuse_trasaction
  result
end

.same_table?(query) ⇒ Boolean

Returns:

  • (Boolean)


29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
# File 'lib/pg_online_schema_change/query.rb', line 29

def same_table?(query)
  tables =
    PgQuery
      .parse(query)
      .tree
      .stmts
      .filter_map do |statement|
        if statement.stmt.alter_table_stmt.instance_of?(PgQuery::AlterTableStmt)
          statement.stmt.alter_table_stmt.relation.relname
        elsif statement.stmt.rename_stmt.instance_of?(PgQuery::RenameStmt)
          statement.stmt.rename_stmt.relation.relname
        end
      end

  tables.uniq.count == 1
rescue PgQuery::ParseError
  false
end

.self_foreign_keys_to_refresh(client, table) ⇒ Object



213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
# File 'lib/pg_online_schema_change/query.rb', line 213

def self_foreign_keys_to_refresh(client, table)
  references =
    get_all_constraints_for(client).select do |row|
      row["table_on"] == table && row["constraint_type"] == "f"
    end

  references
    .map do |row|
      add_statement =
        if row["definition"].end_with?("NOT VALID")
          "ALTER TABLE #{row["table_on"]} ADD CONSTRAINT #{row["constraint_name"]} #{row["definition"]};"
        else
          "ALTER TABLE #{row["table_on"]} ADD CONSTRAINT #{row["constraint_name"]} #{row["definition"]} NOT VALID;"
        end
      add_statement
    end
    .join
end

.storage_parameters_for(client, table, reuse_trasaction = false) ⇒ Object



299
300
301
302
303
304
305
306
307
308
309
310
# File 'lib/pg_online_schema_change/query.rb', line 299

def storage_parameters_for(client, table, reuse_trasaction = false)
  query = <<~SQL
    SELECT array_to_string(reloptions, ',') as params FROM pg_class WHERE relname='#{table}';
  SQL

  columns = []
  run(client.connection, query, reuse_trasaction) do |result|
    columns = result.map { |row| row["params"] }
  end

  columns.first
end

.table(query) ⇒ Object



48
49
50
51
52
53
54
55
56
57
58
# File 'lib/pg_online_schema_change/query.rb', line 48

def table(query)
  from_rename_statement =
    PgQuery
      .parse(query)
      .tree
      .stmts
      .filter_map { |statement| statement.stmt.rename_stmt&.relation&.relname }[
      0
    ]
  PgQuery.parse(query).tables[0] || from_rename_statement
end

.table_columns(client, table = nil, reuse_trasaction = false) ⇒ Object



90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
# File 'lib/pg_online_schema_change/query.rb', line 90

def table_columns(client, table = nil, reuse_trasaction = false)
  sql = <<~SQL
    SELECT attname as column_name, format_type(atttypid, atttypmod) as type, attnum as column_position FROM   pg_attribute
    WHERE  attrelid = '#{table || client.table_name}'::regclass AND attnum > 0 AND NOT attisdropped
    ORDER  BY attnum;
  SQL
  mapped_columns = []

  run(client.connection, sql, reuse_trasaction) do |result|
    mapped_columns =
      result.map do |row|
        row["column_name_regular"] = row["column_name"]
        row["column_name"] = client.connection.quote_ident(row["column_name"])
        row["column_position"] = row["column_position"].to_i
        row
      end
  end

  mapped_columns
end

.table_name(query, table) ⇒ Object



60
61
62
63
64
65
66
67
# File 'lib/pg_online_schema_change/query.rb', line 60

def table_name(query, table)
  table_name = "\"#{table}\""
  if table =~ /[A-Z]/ && query.include?(table_name) && table[0] != '"'
    table_name
  else
    table
  end
end

.view_definitions_for(client, table) ⇒ Object



312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
# File 'lib/pg_online_schema_change/query.rb', line 312

def view_definitions_for(client, table)
  query = <<~SQL
    SELECT DISTINCT
      dependent_view.relname AS view_name,
      pg_get_viewdef(dependent_view.oid) AS view_definition,
      view_ns.nspname AS schema_name
    FROM pg_class AS source_table
    JOIN pg_depend ON pg_depend.refobjid = source_table.oid
    JOIN pg_rewrite ON pg_rewrite.oid = pg_depend.objid
    JOIN pg_class AS dependent_view ON dependent_view.oid = pg_rewrite.ev_class
    JOIN pg_namespace AS view_ns ON dependent_view.relnamespace = view_ns.oid
    AND dependent_view.relkind = 'v'
    AND source_table.relname = '#{table}';
  SQL

  definitions = []
  run(client.connection, query) do |result|
    definitions =
      result.map do |row|
        { "\"#{row["schema_name"]}\".#{row["view_name"]}" => row["view_definition"].strip }
      end
  end

  definitions
end