Class: PgSync::TableSync
- Inherits:
-
Object
show all
- Includes:
- Utils
- Defined in:
- lib/pgsync/table_sync.rb
Constant Summary
Constants included
from Utils
Utils::COLOR_CODES
Instance Attribute Summary collapse
Instance Method Summary
collapse
Methods included from Utils
#colorize, #confirm_tables_exist, #db_config_file, #deprecated, #escape, #first_schema, #friendly_name, #log, #monotonic_time, #output, #quote_ident, #quote_ident_full, #quote_string, #task_name, #warning
Constructor Details
#initialize(source:, destination:, tasks:, opts:, resolver:) ⇒ TableSync
Returns a new instance of TableSync.
7
8
9
10
11
12
13
|
# File 'lib/pgsync/table_sync.rb', line 7
def initialize(source:, destination:, tasks:, opts:, resolver:)
@source = source
@destination = destination
@tasks = tasks
@opts = opts
@resolver = resolver
end
|
Instance Attribute Details
#destination ⇒ Object
Returns the value of attribute destination.
5
6
7
|
# File 'lib/pgsync/table_sync.rb', line 5
def destination
@destination
end
|
#opts ⇒ Object
Returns the value of attribute opts.
5
6
7
|
# File 'lib/pgsync/table_sync.rb', line 5
def opts
@opts
end
|
#resolver ⇒ Object
Returns the value of attribute resolver.
5
6
7
|
# File 'lib/pgsync/table_sync.rb', line 5
def resolver
@resolver
end
|
#source ⇒ Object
Returns the value of attribute source.
5
6
7
|
# File 'lib/pgsync/table_sync.rb', line 5
def source
@source
end
|
#tasks ⇒ Object
Returns the value of attribute tasks.
5
6
7
|
# File 'lib/pgsync/table_sync.rb', line 5
def tasks
@tasks
end
|
Instance Method Details
#add_columns ⇒ Object
31
32
33
34
35
36
37
38
39
|
# File 'lib/pgsync/table_sync.rb', line 31
def add_columns
source_columns = columns(source)
destination_columns = columns(destination)
tasks.each do |task|
task.from_columns = source_columns[task.table] || []
task.to_columns = destination_columns[task.table] || []
end
end
|
#add_primary_keys ⇒ Object
41
42
43
44
45
46
47
|
# File 'lib/pgsync/table_sync.rb', line 41
def add_primary_keys
destination_primary_keys = primary_keys(destination)
tasks.each do |task|
task.to_primary_key = destination_primary_keys[task.table] || []
end
end
|
#add_sequences ⇒ Object
49
50
51
52
53
54
55
56
57
58
59
|
# File 'lib/pgsync/table_sync.rb', line 49
def add_sequences
source_sequences = sequences(source)
destination_sequences = sequences(destination)
tasks.each do |task|
shared_columns = Set.new(task.shared_fields)
task.from_sequences = (source_sequences[task.table] || []).select { |s| shared_columns.include?(s.column) }
task.to_sequences = (destination_sequences[task.table] || []).select { |s| shared_columns.include?(s.column) }
end
end
|
#columns(data_source) ⇒ Object
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
|
# File 'lib/pgsync/table_sync.rb', line 135
def columns(data_source)
query = <<~SQL
SELECT
table_schema AS schema,
table_name AS table,
column_name AS column,
data_type AS type
FROM
information_schema.columns
WHERE
is_generated = 'NEVER'
ORDER BY 1, 2, 3
SQL
data_source.execute(query).group_by { |r| Table.new(r["schema"], r["table"]) }.map do |k, v|
[k, v.map { |r| {name: r["column"], type: r["type"]} }]
end.to_h
end
|
#display_item(item) ⇒ Object
313
314
315
316
317
318
|
# File 'lib/pgsync/table_sync.rb', line 313
def display_item(item)
messages = []
messages << task_name(item)
messages << item.opts[:sql] if item.opts[:sql]
messages.join(" ")
end
|
#fail_sync(failed_tables) ⇒ Object
309
310
311
|
# File 'lib/pgsync/table_sync.rb', line 309
def fail_sync(failed_tables)
raise Error, "Sync failed for #{failed_tables.size} table#{failed_tables.size == 1 ? nil : "s"}: #{failed_tables.join(", ")}"
end
|
#maybe_defer_constraints ⇒ Object
TODO add option to open transaction on source when manually specifying order of tables
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
|
# File 'lib/pgsync/table_sync.rb', line 264
def maybe_defer_constraints
if opts[:disable_integrity] || opts[:disable_integrity_v2]
source.transaction do
yield
end
elsif opts[:defer_constraints_v1] || opts[:defer_constraints_v2]
destination.transaction do
if opts[:defer_constraints_v2]
table_constraints = non_deferrable_constraints(destination)
table_constraints.each do |table, constraints|
constraints.each do |constraint|
destination.execute("ALTER TABLE #{quote_ident_full(table)} ALTER CONSTRAINT #{quote_ident(constraint)} DEFERRABLE")
end
end
end
destination.execute("SET CONSTRAINTS ALL DEFERRED")
source.transaction do
yield
end
if opts[:defer_constraints_v2]
destination.execute("SET CONSTRAINTS ALL IMMEDIATE")
table_constraints.each do |table, constraints|
constraints.each do |constraint|
destination.execute("ALTER TABLE #{quote_ident_full(table)} ALTER CONSTRAINT #{quote_ident(constraint)} NOT DEFERRABLE")
end
end
end
end
else
yield
end
end
|
#non_deferrable_constraints(data_source) ⇒ Object
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
|
# File 'lib/pgsync/table_sync.rb', line 153
def non_deferrable_constraints(data_source)
query = <<~SQL
SELECT
table_schema AS schema,
table_name AS table,
constraint_name
FROM
information_schema.table_constraints
WHERE
constraint_type = 'FOREIGN KEY' AND
is_deferrable = 'NO'
SQL
data_source.execute(query).group_by { |r| Table.new(r["schema"], r["table"]) }.map do |k, v|
[k, v.map { |r| r["constraint_name"] }]
end.to_h
end
|
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
|
# File 'lib/pgsync/table_sync.rb', line 15
def perform
confirm_tables_exist(destination, tasks, "destination")
add_columns
add_primary_keys
add_sequences unless opts[:no_sequences]
show_notes
run_tasks(tasks.reject { |task| task.shared_fields.empty? })
end
|
#primary_keys(data_source) ⇒ Object
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
|
# File 'lib/pgsync/table_sync.rb', line 89
def primary_keys(data_source)
query = <<~SQL
SELECT
nspname AS schema,
relname AS table,
pg_attribute.attname AS column,
format_type(pg_attribute.atttypid, pg_attribute.atttypmod),
pg_attribute.attnum,
pg_index.indkey
FROM
pg_index, pg_class, pg_attribute, pg_namespace
WHERE
indrelid = pg_class.oid AND
pg_class.relnamespace = pg_namespace.oid AND
pg_attribute.attrelid = pg_class.oid AND
pg_attribute.attnum = any(pg_index.indkey) AND
indisprimary
SQL
data_source.execute(query).group_by { |r| Table.new(r["schema"], r["table"]) }.map do |k, v|
[k, v.sort_by { |r| r["indkey"].split(" ").index(r["attnum"]) }.map { |r| r["column"] }]
end.to_h
end
|
#run_tasks(tasks, &block) ⇒ Object
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
|
# File 'lib/pgsync/table_sync.rb', line 170
def run_tasks(tasks, &block)
notices = []
failed_tables = []
started_at = {}
show_spinners = output.tty? && !opts[:in_batches] && !opts[:debug]
if show_spinners
spinners = TTY::Spinner::Multi.new(format: :dots, output: output)
task_spinners = {}
end
start = lambda do |task, i|
message = ":spinner #{display_item(task)}"
if show_spinners
spinner = spinners.register(message)
spinner.auto_spin
task_spinners[task] = spinner
elsif opts[:in_batches]
log message.sub(":spinner", "⠋")
end
started_at[task] = monotonic_time
end
finish = lambda do |task, i, result|
time = (monotonic_time - started_at[task]).round(1)
success = result[:status] == "success"
message =
if result[:message]
"(#{result[:message].lines.first.to_s.strip})"
else
"- #{time}s"
end
notices.concat(result[:notices])
if show_spinners
spinner = task_spinners[task]
if success
spinner.success(message)
else
spinner.error(message)
end
else
status = success ? "✔" : "✖"
log [status, display_item(task), message].join(" ")
end
unless success
failed_tables << task_name(task)
fail_sync(failed_tables) if opts[:fail_fast]
end
end
options = {start: start, finish: finish}
jobs = opts[:jobs]
if opts[:debug] || opts[:in_batches] || opts[:defer_constraints_v1] || opts[:defer_constraints_v2] || opts[:disable_integrity] || opts[:disable_integrity_v2]
warning "--jobs ignored" if jobs
jobs = 0
end
if windows?
options[:in_threads] = jobs || 4
else
options[:in_processes] = jobs if jobs
end
maybe_defer_constraints do
Parallel.each(tasks, **options) do |task|
source.reconnect_if_needed
destination.reconnect_if_needed
task.perform
end
end
notices.each do |notice|
warning notice
end
fail_sync(failed_tables) if failed_tables.any?
end
|
#sequences(data_source) ⇒ Object
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
|
# File 'lib/pgsync/table_sync.rb', line 61
def sequences(data_source)
query = <<~SQL
SELECT
nt.nspname as schema,
t.relname as table,
a.attname as column,
n.nspname as sequence_schema,
s.relname as sequence
FROM
pg_class s
INNER JOIN
pg_depend d ON d.objid = s.oid
INNER JOIN
pg_class t ON d.objid = s.oid AND d.refobjid = t.oid
INNER JOIN
pg_attribute a ON (d.refobjid, d.refobjsubid) = (a.attrelid, a.attnum)
INNER JOIN
pg_namespace n ON n.oid = s.relnamespace
INNER JOIN
pg_namespace nt ON nt.oid = t.relnamespace
WHERE
s.relkind = 'S'
SQL
data_source.execute(query).group_by { |r| Table.new(r["schema"], r["table"]) }.map do |k, v|
[k, v.map { |r| Sequence.new(r["sequence_schema"], r["sequence"], column: r["column"]) }]
end.to_h
end
|
#show_notes ⇒ Object
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
|
# File 'lib/pgsync/table_sync.rb', line 114
def show_notes
resolver.notes.each do |note|
warning note
end
tasks.each do |task|
task.notes.each do |note|
warning "#{task_name(task)}: #{note}"
end
end
if opts[:defer_constraints_v1]
constraints = non_deferrable_constraints(destination)
constraints = tasks.flat_map { |t| constraints[t.table] || [] }
warning "Non-deferrable constraints: #{constraints.join(", ")}" if constraints.any?
end
end
|
#windows? ⇒ Boolean
320
321
322
|
# File 'lib/pgsync/table_sync.rb', line 320
def windows?
Gem.win_platform?
end
|