Class: PgSync::Task
Constant Summary
Constants included from Utils
Instance Attribute Summary collapse
-
#config ⇒ Object
readonly
Returns the value of attribute config.
-
#destination ⇒ Object
readonly
Returns the value of attribute destination.
-
#from_columns ⇒ Object
Returns the value of attribute from_columns.
-
#from_sequences ⇒ Object
Returns the value of attribute from_sequences.
-
#opts ⇒ Object
readonly
Returns the value of attribute opts.
-
#source ⇒ Object
readonly
Returns the value of attribute source.
-
#table ⇒ Object
readonly
Returns the value of attribute table.
-
#to_columns ⇒ Object
Returns the value of attribute to_columns.
-
#to_primary_key ⇒ Object
Returns the value of attribute to_primary_key.
-
#to_sequences ⇒ Object
Returns the value of attribute to_sequences.
Instance Method Summary collapse
- #from_fields ⇒ Object
-
#initialize(source:, destination:, config:, table:, opts:) ⇒ Task
constructor
A new instance of Task.
- #notes ⇒ Object
- #perform ⇒ Object
- #quoted_table ⇒ Object
- #shared_fields ⇒ Object
- #shared_sequences ⇒ Object
- #sync_data ⇒ Object
- #to_fields ⇒ Object
Methods included from Utils
#colorize, #confirm_tables_exist, #db_config_file, #deprecated, #escape, #first_schema, #friendly_name, #log, #monotonic_time, #output, #quote_ident, #quote_ident_full, #quote_string, #task_name, #warning
Constructor Details
#initialize(source:, destination:, config:, table:, opts:) ⇒ Task
Returns a new instance of Task.
8 9 10 11 12 13 14 15 16 |
# File 'lib/pgsync/task.rb', line 8 def initialize(source:, destination:, config:, table:, opts:) @source = source @destination = destination @config = config @table = table @opts = opts @from_sequences = [] @to_sequences = [] end |
Instance Attribute Details
#config ⇒ Object (readonly)
Returns the value of attribute config.
5 6 7 |
# File 'lib/pgsync/task.rb', line 5 def config @config end |
#destination ⇒ Object (readonly)
Returns the value of attribute destination.
5 6 7 |
# File 'lib/pgsync/task.rb', line 5 def destination @destination end |
#from_columns ⇒ Object
Returns the value of attribute from_columns.
6 7 8 |
# File 'lib/pgsync/task.rb', line 6 def from_columns @from_columns end |
#from_sequences ⇒ Object
Returns the value of attribute from_sequences.
6 7 8 |
# File 'lib/pgsync/task.rb', line 6 def from_sequences @from_sequences end |
#opts ⇒ Object (readonly)
Returns the value of attribute opts.
5 6 7 |
# File 'lib/pgsync/task.rb', line 5 def opts @opts end |
#source ⇒ Object (readonly)
Returns the value of attribute source.
5 6 7 |
# File 'lib/pgsync/task.rb', line 5 def source @source end |
#table ⇒ Object (readonly)
Returns the value of attribute table.
5 6 7 |
# File 'lib/pgsync/task.rb', line 5 def table @table end |
#to_columns ⇒ Object
Returns the value of attribute to_columns.
6 7 8 |
# File 'lib/pgsync/task.rb', line 6 def to_columns @to_columns end |
#to_primary_key ⇒ Object
Returns the value of attribute to_primary_key.
6 7 8 |
# File 'lib/pgsync/task.rb', line 6 def to_primary_key @to_primary_key end |
#to_sequences ⇒ Object
Returns the value of attribute to_sequences.
6 7 8 |
# File 'lib/pgsync/task.rb', line 6 def to_sequences @to_sequences end |
Instance Method Details
#from_fields ⇒ Object
32 33 34 |
# File 'lib/pgsync/task.rb', line 32 def from_fields @from_fields ||= from_columns.map { |c| c[:name] } end |
#notes ⇒ Object
48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 |
# File 'lib/pgsync/task.rb', line 48 def notes notes = [] if shared_fields.empty? notes << "No fields to copy" else extra_fields = to_fields - from_fields notes << "Extra columns: #{extra_fields.join(", ")}" if extra_fields.any? missing_fields = from_fields - to_fields notes << "Missing columns: #{missing_fields.join(", ")}" if missing_fields.any? extra_sequences = to_sequences - from_sequences notes << "Extra sequences: #{extra_sequences.join(", ")}" if extra_sequences.any? missing_sequences = from_sequences - to_sequences notes << "Missing sequences: #{missing_sequences.join(", ")}" if missing_sequences.any? from_types = from_columns.map { |c| [c[:name], c[:type]] }.to_h to_types = to_columns.map { |c| [c[:name], c[:type]] }.to_h different_types = [] shared_fields.each do |field| if from_types[field] != to_types[field] different_types << "#{field} (#{from_types[field]} -> #{to_types[field]})" end end notes << "Different column types: #{different_types.join(", ")}" if different_types.any? end notes end |
#perform ⇒ Object
22 23 24 25 26 27 28 29 30 |
# File 'lib/pgsync/task.rb', line 22 def perform with_notices do handle_errors do maybe_disable_triggers do sync_data end end end end |
#quoted_table ⇒ Object
18 19 20 |
# File 'lib/pgsync/task.rb', line 18 def quoted_table quote_ident_full(table) end |
#shared_fields ⇒ Object
40 41 42 |
# File 'lib/pgsync/task.rb', line 40 def shared_fields @shared_fields ||= to_fields & from_fields end |
#shared_sequences ⇒ Object
44 45 46 |
# File 'lib/pgsync/task.rb', line 44 def shared_sequences @shared_sequences ||= to_sequences & from_sequences end |
#sync_data ⇒ Object
78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 |
# File 'lib/pgsync/task.rb', line 78 def sync_data raise Error, "This should never happen. Please file a bug." if shared_fields.empty? sql_clause = String.new("") sql_clause << " #{opts[:sql]}" if opts[:sql] bad_fields = opts[:no_rules] ? [] : config["data_rules"] primary_key = to_primary_key copy_fields = shared_fields.map { |f| f2 = bad_fields.to_a.find { |bf, _| rule_match?(table, f, bf) }; f2 ? "#{apply_strategy(f2[1], table, f, primary_key)} AS #{quote_ident(f)}" : "#{quoted_table}.#{quote_ident(f)}" }.join(", ") fields = shared_fields.map { |f| quote_ident(f) }.join(", ") copy_to_command = "COPY (SELECT #{copy_fields} FROM #{quoted_table}#{sql_clause}) TO STDOUT" if opts[:in_batches] raise Error, "Primary key required for --in-batches" if primary_key.empty? primary_key = primary_key.first destination.truncate(table) if opts[:truncate] from_max_id = source.max_id(table, primary_key) to_max_id = destination.max_id(table, primary_key) + 1 if to_max_id == 1 from_min_id = source.min_id(table, primary_key) to_max_id = from_min_id if from_min_id > 0 end starting_id = to_max_id batch_size = opts[:batch_size] i = 1 batch_count = ((from_max_id - starting_id + 1) / batch_size.to_f).ceil while starting_id <= from_max_id where = "#{quote_ident(primary_key)} >= #{starting_id} AND #{quote_ident(primary_key)} < #{starting_id + batch_size}" log " #{i}/#{batch_count}: #{where}" # TODO be smarter for advance sql clauses batch_sql_clause = " #{sql_clause.length > 0 ? "#{sql_clause} AND" : "WHERE"} #{where}" batch_copy_to_command = "COPY (SELECT #{copy_fields} FROM #{quoted_table}#{batch_sql_clause}) TO STDOUT" copy(batch_copy_to_command, dest_table: table, dest_fields: fields) starting_id += batch_size i += 1 if opts[:sleep] && starting_id <= from_max_id sleep(opts[:sleep]) end end elsif !opts[:truncate] && (opts[:overwrite] || opts[:preserve] || !sql_clause.empty?) if primary_key.empty? raise Error, "Primary key required for --overwrite" if opts[:overwrite] raise Error, "Primary key required for --preserve" if opts[:preserve] raise Error, "Primary key required to sync specific rows" end # create a temp table temp_table = "pgsync_#{rand(1_000_000_000)}" destination.execute("CREATE TEMPORARY TABLE #{quote_ident_full(temp_table)} AS TABLE #{quoted_table} WITH NO DATA") # load data copy(copy_to_command, dest_table: temp_table, dest_fields: fields) on_conflict = primary_key.map { |pk| quote_ident(pk) }.join(", ") action = if opts[:preserve] "NOTHING" else # overwrite or sql clause setter = shared_fields.reject { |f| primary_key.include?(f) }.map { |f| "#{quote_ident(f)} = EXCLUDED.#{quote_ident(f)}" } if setter.any? "UPDATE SET #{setter.join(", ")}" else "NOTHING" end end destination.execute("INSERT INTO #{quoted_table} (#{fields}) (SELECT #{fields} FROM #{quote_ident_full(temp_table)}) ON CONFLICT (#{on_conflict}) DO #{action}") else # use delete instead of truncate for foreign keys if opts[:defer_constraints_v1] || opts[:defer_constraints_v2] destination.execute("DELETE FROM #{quoted_table}") else destination.truncate(table) end copy(copy_to_command, dest_table: table, dest_fields: fields) end # update sequences shared_sequences.each do |seq| value = source.last_value(seq) destination.execute("SELECT setval(#{escape(quote_ident_full(seq))}, #{escape(value)})") end {status: "success"} end |
#to_fields ⇒ Object
36 37 38 |
# File 'lib/pgsync/task.rb', line 36 def to_fields @to_fields ||= to_columns.map { |c| c[:name] } end |