Class: Webhookdb::SyncTarget::DatabaseRoutine
- Defined in:
- lib/webhookdb/sync_target.rb
Overview
-
Ensure the sync target table exists and has the right schema. In general we do NOT create indices for the target table; since this table is for a client’s data warehouse, we assume they will optimize it as needed. The only exception is the unique constraint for the remote key column.
-
Select rows created/updated since our last update in our ‘source’ database.
-
Write them to disk into a CSV file.
-
Pass this CSV file to the proper sync target adapter.
-
For example, the PG sync target will:
-
Create a temp table in the target database, using the schema from the sync target table.
-
Load the data into that temp table.
-
Insert rows into the target table temp table rows that do not appear in the target table.
-
Update rows in the target table temp table rows that already appear in the target table.
-
-
The snowflake sync target will:
-
PUT the CSV file into the stage for the table.
-
Otherwise the logic is the same as PG: create a temp table and COPY INTO from the CSV.
-
Purge the staged file.
-
Instance Attribute Summary
Attributes inherited from Routine
#last_synced_at, #now, #replicator, #sync_target, #timestamp_expr
Instance Method Summary collapse
-
#initialize(now, sync_target) ⇒ DatabaseRoutine
constructor
A new instance of DatabaseRoutine.
- #run ⇒ Object
Methods inherited from Routine
Constructor Details
#initialize(now, sync_target) ⇒ DatabaseRoutine
Returns a new instance of DatabaseRoutine.
420 421 422 423 424 425 |
# File 'lib/webhookdb/sync_target.rb', line 420 def initialize(now, sync_target) super @connection_url = self.sync_target.connection_url @adapter = Webhookdb::DBAdapter.adapter(@connection_url) @adapter_connection = @adapter.connection(@connection_url) end |
Instance Method Details
#run ⇒ Object
427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 |
# File 'lib/webhookdb/sync_target.rb', line 427 def run schema_name = @sync_target.schema.present? ? @sync_target.schema : @sync_target.class.default_schema table_name = @sync_target.table.present? ? @sync_target.table : @sync_target.service_integration.table_name adapter = @adapter schema = Webhookdb::DBAdapter::Schema.new(name: schema_name.to_sym) table = Webhookdb::DBAdapter::Table.new(name: table_name.to_sym, schema:) schema_lines = [] schema_lines << adapter.create_schema_sql(table.schema, if_not_exists: true) schema_lines << adapter.create_table_sql( table, [@replicator.primary_key_column, @replicator.remote_key_column], if_not_exists: true, ) (@replicator.denormalized_columns + [@replicator.data_column]).each do |col| schema_lines << adapter.add_column_sql(table, col, if_not_exists: true) end adapter_conn = adapter.connection(@connection_url) schema_expr = schema_lines.join(";\n") + ";" if schema_expr != self.sync_target.last_applied_schema adapter_conn.execute(schema_expr) self.sync_target.update(last_applied_schema: schema_expr) end tempfile = Tempfile.new("whdbsyncout-#{self.sync_target.id}") begin self.dataset_to_sync do |ds| ds.db.copy_table(ds, options: "DELIMITER ',', HEADER true, FORMAT csv") do |row| tempfile.write(row) end end tempfile.rewind adapter.merge_from_csv( adapter_conn, tempfile, table, @replicator.primary_key_column, [@replicator.primary_key_column, @replicator.remote_key_column,] + @replicator.denormalized_columns + [@replicator.data_column], ) self.record(self.now) ensure tempfile.unlink end end |