Class: Webhookdb::SyncTarget::DatabaseRoutine

Inherits:
Routine
  • Object
show all
Defined in:
lib/webhookdb/sync_target.rb

Overview

  • Ensure the sync target table exists and has the right schema. In general we do NOT create indices for the target table; since this table is for a client’s data warehouse, we assume they will optimize it as needed. The only exception is the unique constraint for the remote key column.

  • Select rows created/updated since our last update in our ‘source’ database.

  • Write them to disk into a CSV file.

  • Pass this CSV file to the proper sync target adapter.

  • For example, the PG sync target will:

    • Create a temp table in the target database, using the schema from the sync target table.

    • Load the data into that temp table.

    • Insert rows into the target table temp table rows that do not appear in the target table.

    • Update rows in the target table temp table rows that already appear in the target table.

  • The snowflake sync target will:

    • PUT the CSV file into the stage for the table.

    • Otherwise the logic is the same as PG: create a temp table and COPY INTO from the CSV.

    • Purge the staged file.

Instance Attribute Summary

Attributes inherited from Routine

#last_synced_at, #now, #replicator, #sync_target, #timestamp_expr

Instance Method Summary collapse

Methods inherited from Routine

#dataset_to_sync, #record

Constructor Details

#initialize(now, sync_target) ⇒ DatabaseRoutine

Returns a new instance of DatabaseRoutine.



422
423
424
425
426
427
# File 'lib/webhookdb/sync_target.rb', line 422

def initialize(now, sync_target)
  super
  @connection_url = self.sync_target.connection_url
  @adapter = Webhookdb::DBAdapter.adapter(@connection_url)
  @adapter_connection = @adapter.connection(@connection_url)
end

Instance Method Details

#runObject



429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
# File 'lib/webhookdb/sync_target.rb', line 429

def run
  schema_name = @sync_target.schema.present? ? @sync_target.schema : @sync_target.class.default_schema
  table_name = @sync_target.table.present? ? @sync_target.table : @sync_target.service_integration.table_name
  adapter = @adapter
  schema = Webhookdb::DBAdapter::Schema.new(name: schema_name.to_sym)
  table = Webhookdb::DBAdapter::Table.new(name: table_name.to_sym, schema:)

  schema_lines = []
  schema_lines << adapter.create_schema_sql(table.schema, if_not_exists: true)
  schema_lines << adapter.create_table_sql(
    table,
    [@replicator.primary_key_column, @replicator.remote_key_column],
    if_not_exists: true,
  )
  (@replicator.denormalized_columns + [@replicator.data_column]).each do |col|
    schema_lines << adapter.add_column_sql(table, col, if_not_exists: true)
  end
  adapter_conn = adapter.connection(@connection_url)
  schema_expr = schema_lines.join(";\n") + ";"
  if schema_expr != self.sync_target.last_applied_schema
    adapter_conn.execute(schema_expr)
    self.sync_target.update(last_applied_schema: schema_expr)
  end
  tempfile = Tempfile.new("whdbsyncout-#{self.sync_target.id}")
  begin
    self.dataset_to_sync do |ds|
      ds.db.copy_table(ds, options: "DELIMITER ',', HEADER true, FORMAT csv") do |row|
        tempfile.write(row)
      end
    end
    tempfile.rewind
    adapter.merge_from_csv(
      adapter_conn,
      tempfile,
      table,
      @replicator.primary_key_column,
      [@replicator.primary_key_column,
       @replicator.remote_key_column,] + @replicator.denormalized_columns + [@replicator.data_column],
    )
    self.record(self.now)
  ensure
    tempfile.unlink
  end
end