Class: PgOnlineSchemaChange::Query
- Inherits:
-
Object
- Object
- PgOnlineSchemaChange::Query
show all
- Extended by:
- Helper
- Defined in:
- lib/pg_online_schema_change/query.rb
Constant Summary
collapse
- INDEX_SUFFIX =
"_pgosc"
- DROPPED_COLUMN_TYPE =
:AT_DropColumn
- RENAMED_COLUMN_TYPE =
:AT_RenameColumn
- LOCK_ATTEMPT =
4
Class Method Summary
collapse
-
.alter_statement?(query) ⇒ Boolean
-
.alter_statement_for(client, shadow_table) ⇒ Object
-
.copy_data_statement(client, shadow_table, reuse_trasaction = false) ⇒ Object
-
.dropped_columns(client) ⇒ Object
-
.get_all_constraints_for(client) ⇒ Object
-
.get_foreign_keys_for(client, table) ⇒ Object
-
.get_foreign_keys_to_validate(client, table) ⇒ Object
-
.get_indexes_for(client, table) ⇒ Object
-
.get_primary_keys_for(client, table) ⇒ Object
-
.get_sequence_name(client, table, column) ⇒ Object
fetches the sequence name of a table and column combination.
-
.get_table_size(connection, schema, table_name) ⇒ Object
-
.get_triggers_for(client, table) ⇒ Object
-
.kill_backends(client, table) ⇒ Object
-
.open_lock_exclusive(client, table) ⇒ Object
This function acquires the lock and keeps the transaction open.
-
.primary_key_for(client, table) ⇒ Object
-
.primary_key_sequence(shadow_table, primary_key, opened) ⇒ Object
-
.query_for_primary_key_refresh(shadow_table, primary_key, table, opened) ⇒ Object
-
.referential_foreign_keys_to_refresh(client, table) ⇒ Object
-
.renamed_columns(client) ⇒ Object
-
.run(connection, query, reuse_trasaction = false, &block) ⇒ Object
rubocop:disable Style/ArgumentsForwarding.
-
.same_table?(query) ⇒ Boolean
-
.self_foreign_keys_to_refresh(client, table) ⇒ Object
-
.storage_parameters_for(client, table, reuse_trasaction = false) ⇒ Object
-
.table(query) ⇒ Object
-
.table_columns(client, table = nil, reuse_trasaction = false) ⇒ Object
-
.table_name(query, table) ⇒ Object
-
.view_definitions_for(client, table) ⇒ Object
Methods included from Helper
logger, method_missing, primary_key, respond_to_missing?
Class Method Details
.alter_statement?(query) ⇒ Boolean
16
17
18
19
20
21
22
23
24
25
26
27
|
# File 'lib/pg_online_schema_change/query.rb', line 16
def alter_statement?(query)
PgQuery
.parse(query)
.tree
.stmts
.all? do |statement|
statement.stmt.alter_table_stmt.instance_of?(PgQuery::AlterTableStmt) ||
statement.stmt.rename_stmt.instance_of?(PgQuery::RenameStmt)
end
rescue PgQuery::ParseError
false
end
|
.alter_statement_for(client, shadow_table) ⇒ Object
111
112
113
114
115
116
117
118
119
120
121
122
|
# File 'lib/pg_online_schema_change/query.rb', line 111
def alter_statement_for(client, shadow_table)
parsed_query = PgQuery.parse(client.alter_statement)
parsed_query.tree.stmts.each do |statement|
if statement.stmt.alter_table_stmt
statement.stmt.alter_table_stmt.relation.relname = shadow_table
end
statement.stmt.rename_stmt.relation.relname = shadow_table if statement.stmt.rename_stmt
end
parsed_query.deparse
end
|
.copy_data_statement(client, shadow_table, reuse_trasaction = false) ⇒ Object
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
|
# File 'lib/pg_online_schema_change/query.rb', line 381
def copy_data_statement(client, shadow_table, reuse_trasaction = false)
select_columns =
table_columns(client, client.table_name, reuse_trasaction).map do |entry|
entry["column_name_regular"]
end
select_columns -= dropped_columns_list if dropped_columns_list.any?
insert_into_columns = select_columns.dup
if renamed_columns_list.any?
renamed_columns_list.each do |obj|
insert_into_columns.each_with_index do |insert_into_column, index|
insert_into_columns[index] = obj[:new_name] if insert_into_column == obj[:old_name]
end
end
end
insert_into_columns.map! do |insert_into_column|
client.connection.quote_ident(insert_into_column)
end
select_columns.map! { |select_column| client.connection.quote_ident(select_column) }
<<~SQL
INSERT INTO #{shadow_table}(#{insert_into_columns.join(", ")})
SELECT #{select_columns.join(", ")}
FROM ONLY #{client.table_name}
SQL
end
|
.dropped_columns(client) ⇒ Object
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
|
# File 'lib/pg_online_schema_change/query.rb', line 245
def dropped_columns(client)
PgQuery
.parse(client.alter_statement)
.tree
.stmts
.map do |statement|
next if statement.stmt.alter_table_stmt.nil?
statement.stmt.alter_table_stmt.cmds.map do |cmd|
cmd.alter_table_cmd.name if cmd.alter_table_cmd.subtype == DROPPED_COLUMN_TYPE
end
end
.flatten
.compact
end
|
.get_all_constraints_for(client) ⇒ Object
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
|
# File 'lib/pg_online_schema_change/query.rb', line 160
def get_all_constraints_for(client)
query = <<~SQL
SELECT conrelid::regclass AS table_on,
confrelid::regclass AS table_from,
contype as constraint_type,
conname AS constraint_name,
convalidated AS constraint_validated,
pg_get_constraintdef(oid) AS definition
FROM pg_constraint
WHERE contype IN ('f', 'p')
SQL
constraints = []
run(client.connection, query) { |result| constraints = result.map { |row| row } }
constraints
end
|
.get_foreign_keys_for(client, table) ⇒ Object
184
185
186
187
188
|
# File 'lib/pg_online_schema_change/query.rb', line 184
def get_foreign_keys_for(client, table)
get_all_constraints_for(client).select do |row|
row["table_on"] == table && row["constraint_type"] == "f"
end
end
|
.get_foreign_keys_to_validate(client, table) ⇒ Object
232
233
234
235
236
237
238
239
240
241
242
243
|
# File 'lib/pg_online_schema_change/query.rb', line 232
def get_foreign_keys_to_validate(client, table)
constraints = get_all_constraints_for(client)
referential_foreign_keys =
constraints.select { |row| row["table_from"] == table && row["constraint_type"] == "f" }
self_foreign_keys =
constraints.select { |row| row["table_on"] == table && row["constraint_type"] == "f" }
[referential_foreign_keys, self_foreign_keys].flatten.map do |row|
"ALTER TABLE #{row["table_on"]} VALIDATE CONSTRAINT #{row["constraint_name"]};"
end
end
|
.get_indexes_for(client, table) ⇒ Object
124
125
126
127
128
129
130
131
132
133
134
135
|
# File 'lib/pg_online_schema_change/query.rb', line 124
def get_indexes_for(client, table)
query = <<~SQL
SELECT indexdef, schemaname
FROM pg_indexes
WHERE schemaname = '#{client.schema}' AND tablename = '#{table}'
SQL
indexes = []
run(client.connection, query) { |result| indexes = result.map { |row| row["indexdef"] } }
indexes
end
|
.get_primary_keys_for(client, table) ⇒ Object
178
179
180
181
182
|
# File 'lib/pg_online_schema_change/query.rb', line 178
def get_primary_keys_for(client, table)
get_all_constraints_for(client).select do |row|
row["table_on"] == table && row["constraint_type"] == "p"
end
end
|
.get_sequence_name(client, table, column) ⇒ Object
fetches the sequence name of a table and column combination
138
139
140
141
142
143
144
145
146
|
# File 'lib/pg_online_schema_change/query.rb', line 138
def get_sequence_name(client, table, column)
query = <<~SQL
SELECT pg_get_serial_sequence('#{table}', '#{column}');
SQL
run(client.connection, query) do |result|
result.map { |row| row["pg_get_serial_sequence"] }
end.first
end
|
.get_table_size(connection, schema, table_name) ⇒ Object
432
433
434
435
436
437
438
439
|
# File 'lib/pg_online_schema_change/query.rb', line 432
def get_table_size(connection, schema, table_name)
size_query = "SELECT pg_table_size('#{schema}.#{table_name}');"
result = run(connection, size_query).first
result["pg_table_size"].to_i
rescue StandardError => e
logger.error("Error getting table size: #{e.message}")
0
end
|
.get_triggers_for(client, table) ⇒ Object
148
149
150
151
152
153
154
155
156
157
158
|
# File 'lib/pg_online_schema_change/query.rb', line 148
def get_triggers_for(client, table)
query = <<~SQL
SELECT pg_get_triggerdef(oid) as tdef FROM pg_trigger
WHERE tgrelid = '#{client.schema}.#{table}'::regclass AND tgisinternal = FALSE;
SQL
triggers = []
run(client.connection, query) { |result| triggers = result.map { |row| "#{row["tdef"]};" } }
triggers.join(";")
end
|
.kill_backends(client, table) ⇒ Object
369
370
371
372
373
374
375
376
377
378
379
|
# File 'lib/pg_online_schema_change/query.rb', line 369
def kill_backends(client, table)
return unless client.kill_backends
logger.info("Terminating other backends")
query = <<~SQL
SELECT pg_terminate_backend(pid) FROM pg_locks WHERE locktype = 'relation' AND relation = '#{table}'::regclass::oid AND pid <> pg_backend_pid()
SQL
run(client.connection, query, true)
end
|
.open_lock_exclusive(client, table) ⇒ Object
This function acquires the lock and keeps the transaction open. If a lock is acquired, its upon the caller to call COMMIT to end the transaction. If a lock is not acquired, transaction is closed and a new transaction is started to acquire lock again
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
|
# File 'lib/pg_online_schema_change/query.rb', line 343
def open_lock_exclusive(client, table)
attempts ||= 1
query = <<~SQL
SET lock_timeout = '#{client.wait_time_for_lock}s';
LOCK TABLE #{client.table_name} IN ACCESS EXCLUSIVE MODE;
SQL
run(client.connection, query, true)
true
rescue PG::LockNotAvailable, PG::InFailedSqlTransaction
if (attempts += 1) < LOCK_ATTEMPT
logger.info("Couldn't acquire lock, attempt: #{attempts}")
run(client.connection, "RESET lock_timeout;")
kill_backends(client, table)
retry
end
logger.info("Lock acquire failed")
run(client.connection, "RESET lock_timeout;")
false
end
|
.primary_key_for(client, table) ⇒ Object
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
|
# File 'lib/pg_online_schema_change/query.rb', line 278
def primary_key_for(client, table)
query = <<~SQL
SELECT
pg_attribute.attname as column_name
FROM pg_index, pg_class, pg_attribute, pg_namespace
WHERE
pg_class.oid = '#{table}'::regclass AND
indrelid = pg_class.oid AND
nspname = '#{client.schema}' AND
pg_class.relnamespace = pg_namespace.oid AND
pg_attribute.attrelid = pg_class.oid AND
pg_attribute.attnum = any(pg_index.indkey)
AND indisprimary
SQL
columns = []
run(client.connection, query) { |result| columns = result.map { |row| row["column_name"] } }
columns.first
end
|
.primary_key_sequence(shadow_table, primary_key, opened) ⇒ Object
412
413
414
415
416
417
418
419
420
|
# File 'lib/pg_online_schema_change/query.rb', line 412
def primary_key_sequence(shadow_table, primary_key, opened)
query = <<~SQL
SELECT pg_get_serial_sequence('#{shadow_table}', '#{primary_key}') as sequence_name
SQL
result = run(client.connection, query, opened)
result.map { |row| row["sequence_name"] }&.first
end
|
.query_for_primary_key_refresh(shadow_table, primary_key, table, opened) ⇒ Object
422
423
424
425
426
427
428
429
430
|
# File 'lib/pg_online_schema_change/query.rb', line 422
def query_for_primary_key_refresh(shadow_table, primary_key, table, opened)
sequence_name = primary_key_sequence(shadow_table, primary_key, opened)
return "" if sequence_name.nil?
<<~SQL
SELECT setval((select pg_get_serial_sequence('#{shadow_table}', '#{primary_key}')), (SELECT max(#{primary_key}) FROM #{table}));
SQL
end
|
.referential_foreign_keys_to_refresh(client, table) ⇒ Object
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
|
# File 'lib/pg_online_schema_change/query.rb', line 190
def referential_foreign_keys_to_refresh(client, table)
references =
get_all_constraints_for(client).select do |row|
row["table_from"] == table && row["constraint_type"] == "f"
end
references
.map do |row|
add_statement =
if row["definition"].end_with?("NOT VALID")
"ALTER TABLE #{row["table_on"]} ADD CONSTRAINT #{row["constraint_name"]} #{row["definition"]};"
else
"ALTER TABLE #{row["table_on"]} ADD CONSTRAINT #{row["constraint_name"]} #{row["definition"]} NOT VALID;"
end
drop_statement =
"ALTER TABLE #{row["table_on"]} DROP CONSTRAINT #{row["constraint_name"]};"
"#{drop_statement} #{add_statement}"
end
.join
end
|
.renamed_columns(client) ⇒ Object
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
|
# File 'lib/pg_online_schema_change/query.rb', line 261
def renamed_columns(client)
PgQuery
.parse(client.alter_statement)
.tree
.stmts
.map do |statement|
next if statement.stmt.rename_stmt.nil?
{
old_name: statement.stmt.rename_stmt.subname,
new_name: statement.stmt.rename_stmt.newname,
}
end
.flatten
.compact
end
|
.run(connection, query, reuse_trasaction = false, &block) ⇒ Object
rubocop:disable Style/ArgumentsForwarding
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
|
# File 'lib/pg_online_schema_change/query.rb', line 69
def run(connection, query, reuse_trasaction = false, &block) if [PG::PQTRANS_INERROR, PG::PQTRANS_UNKNOWN].include?(connection.transaction_status)
connection.cancel
end
logger.debug("Running query", { query: query })
connection.async_exec("BEGIN;")
result = connection.async_exec(query, &block) rescue Exception connection.cancel if connection.transaction_status != PG::PQTRANS_IDLE
connection.block
logger.info("Exception raised, rolling back query", { rollback: true, query: query })
connection.async_exec("ROLLBACK;")
raise
else
connection.async_exec("COMMIT;") unless reuse_trasaction
result
end
|
.same_table?(query) ⇒ Boolean
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
|
# File 'lib/pg_online_schema_change/query.rb', line 29
def same_table?(query)
tables =
PgQuery
.parse(query)
.tree
.stmts
.filter_map do |statement|
if statement.stmt.alter_table_stmt.instance_of?(PgQuery::AlterTableStmt)
statement.stmt.alter_table_stmt.relation.relname
elsif statement.stmt.rename_stmt.instance_of?(PgQuery::RenameStmt)
statement.stmt.rename_stmt.relation.relname
end
end
tables.uniq.count == 1
rescue PgQuery::ParseError
false
end
|
.self_foreign_keys_to_refresh(client, table) ⇒ Object
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
|
# File 'lib/pg_online_schema_change/query.rb', line 213
def self_foreign_keys_to_refresh(client, table)
references =
get_all_constraints_for(client).select do |row|
row["table_on"] == table && row["constraint_type"] == "f"
end
references
.map do |row|
add_statement =
if row["definition"].end_with?("NOT VALID")
"ALTER TABLE #{row["table_on"]} ADD CONSTRAINT #{row["constraint_name"]} #{row["definition"]};"
else
"ALTER TABLE #{row["table_on"]} ADD CONSTRAINT #{row["constraint_name"]} #{row["definition"]} NOT VALID;"
end
add_statement
end
.join
end
|
.storage_parameters_for(client, table, reuse_trasaction = false) ⇒ Object
299
300
301
302
303
304
305
306
307
308
309
310
|
# File 'lib/pg_online_schema_change/query.rb', line 299
def storage_parameters_for(client, table, reuse_trasaction = false)
query = <<~SQL
SELECT array_to_string(reloptions, ',') as params FROM pg_class WHERE relname='#{table}';
SQL
columns = []
run(client.connection, query, reuse_trasaction) do |result|
columns = result.map { |row| row["params"] }
end
columns.first
end
|
.table(query) ⇒ Object
48
49
50
51
52
53
54
55
56
57
58
|
# File 'lib/pg_online_schema_change/query.rb', line 48
def table(query)
from_rename_statement =
PgQuery
.parse(query)
.tree
.stmts
.filter_map { |statement| statement.stmt.rename_stmt&.relation&.relname }[
0
]
PgQuery.parse(query).tables[0] || from_rename_statement
end
|
.table_columns(client, table = nil, reuse_trasaction = false) ⇒ Object
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
|
# File 'lib/pg_online_schema_change/query.rb', line 90
def table_columns(client, table = nil, reuse_trasaction = false)
sql = <<~SQL
SELECT attname as column_name, format_type(atttypid, atttypmod) as type, attnum as column_position FROM pg_attribute
WHERE attrelid = '#{table || client.table_name}'::regclass AND attnum > 0 AND NOT attisdropped
ORDER BY attnum;
SQL
mapped_columns = []
run(client.connection, sql, reuse_trasaction) do |result|
mapped_columns =
result.map do |row|
row["column_name_regular"] = row["column_name"]
row["column_name"] = client.connection.quote_ident(row["column_name"])
row["column_position"] = row["column_position"].to_i
row
end
end
mapped_columns
end
|
.table_name(query, table) ⇒ Object
60
61
62
63
64
65
66
67
|
# File 'lib/pg_online_schema_change/query.rb', line 60
def table_name(query, table)
table_name = "\"#{table}\""
if table =~ /[A-Z]/ && query.include?(table_name) && table[0] != '"'
table_name
else
table
end
end
|
.view_definitions_for(client, table) ⇒ Object
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
|
# File 'lib/pg_online_schema_change/query.rb', line 312
def view_definitions_for(client, table)
query = <<~SQL
SELECT DISTINCT
dependent_view.relname AS view_name,
pg_get_viewdef(dependent_view.oid) AS view_definition,
view_ns.nspname AS schema_name
FROM pg_class AS source_table
JOIN pg_depend ON pg_depend.refobjid = source_table.oid
JOIN pg_rewrite ON pg_rewrite.oid = pg_depend.objid
JOIN pg_class AS dependent_view ON dependent_view.oid = pg_rewrite.ev_class
JOIN pg_namespace AS view_ns ON dependent_view.relnamespace = view_ns.oid
AND dependent_view.relkind = 'v'
AND source_table.relname = '#{table}';
SQL
definitions = []
run(client.connection, query) do |result|
definitions =
result.map do |row|
{ "\"#{row["schema_name"]}\".#{row["view_name"]}" => row["view_definition"].strip }
end
end
definitions
end
|