Class: DbTransformer::Synchronizer

Inherits:
Object
  • Object
show all
Defined in:
lib/db_transformer/synchronizer.rb

Constant Summary collapse

LIMIT =
500

Instance Method Summary collapse

Constructor Details

#initialize(settings) ⇒ Synchronizer

Returns a new instance of Synchronizer.



5
6
7
# File 'lib/db_transformer/synchronizer.rb', line 5

def initialize(settings)
  @settings = settings
end

Instance Method Details

#execute!Object



9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
# File 'lib/db_transformer/synchronizer.rb', line 9

def execute!
  destination_database_client.run('SET foreign_key_checks = 0;')

  tables.each do |table_name|
    if @settings.dig('destination', 'options', 'force_replace')
      destination_database_client.run("DROP TABLE IF EXISTS #{table_name}")
    end

    create_table_query = source_database_client.fetch('SHOW CREATE TABLE ?', table_name).first[:'Create Table']
    create_table_query = create_table_query.gsub('CREATE TABLE', 'CREATE TABLE IF NOT EXISTS')
    destination_database_client.run(create_table_query)
  end

  Parallel.each(tables, in_threads: [source_database_max_connection, destination_database_max_connection].min) do |table_name|
    destination_database_client.run('SET foreign_key_checks = 0;')

    logger.info("Start copying `#{table_name}`")

    offset = 0
    loop do
      data = source_database_client[table_name].limit(LIMIT, offset)

      table = destination_database_client[table_name]
      table.multi_insert(data.map { |row| Transform.execute!(table_name, row, rules) })

      break if data.none?

      offset += LIMIT
    end

    logger.info("Copied `#{table_name}`")
  end

  destination_database_client.run('SET foreign_key_checks = 1;')
rescue Sequel::DatabaseConnectionError => e
  if e.message.include?('Unknown database')
    destination_database_client_without_db.run("CREATE DATABASE `#{@settings['destination']['database']}`")

    retry
  end

  raise
end