Class: Webhookdb::DBAdapter::PG
- Inherits:
-
Webhookdb::DBAdapter
- Object
- Webhookdb::DBAdapter
- Webhookdb::DBAdapter::PG
- Includes:
- ColumnTypes, DefaultSql
- Defined in:
- lib/webhookdb/db_adapter/pg.rb
Constant Summary collapse
- VERIFY_TIMEOUT =
2
- VERIFY_STATEMENT =
"SELECT 1"
- COLTYPE_MAP =
{ BIGINT => "bigint", BIGINT_ARRAY => "bigint[]", BOOLEAN => "boolean", DATE => "date", DECIMAL => "numeric", DOUBLE => "double precision", FLOAT => "float", INTEGER => "integer", INTEGER_ARRAY => "integer[]", OBJECT => "jsonb", TEXT => "text", TEXT_ARRAY => "text[]", TIMESTAMP => "timestamptz", UUID => "uuid", }.freeze
Constants included from DefaultSql
DefaultSql::PG_RESERVED_KEYWORDS, DefaultSql::RESERVED_KEYWORDS
Constants included from ColumnTypes
ColumnTypes::BIGINT, ColumnTypes::BIGINT_ARRAY, ColumnTypes::BOOLEAN, ColumnTypes::COLUMN_TYPES, ColumnTypes::DATE, ColumnTypes::DECIMAL, ColumnTypes::DOUBLE, ColumnTypes::FLOAT, ColumnTypes::INTEGER, ColumnTypes::INTEGER_ARRAY, ColumnTypes::OBJECT, ColumnTypes::TEXT, ColumnTypes::TEXT_ARRAY, ColumnTypes::TIMESTAMP, ColumnTypes::UUID
Constants inherited from Webhookdb::DBAdapter
INVALID_IDENTIFIER_MESSAGE, INVALID_IDENTIFIER_PROMPT, VALID_IDENTIFIER
Instance Attribute Summary
Attributes inherited from Webhookdb::DBAdapter
#name, #table, #targets, #unique
Instance Method Summary collapse
- #add_column_sql(table, column, if_not_exists: false) ⇒ Object
- #column_create_sql(column) ⇒ Object
- #create_index_sql(index, concurrently:) ⇒ Object
- #identifier_quote_char ⇒ Object
- #merge_from_csv(connection, file, table, pk_col, copy_columns) ⇒ Object
- #verify_connection(url, timeout: 2, statement: "SELECT 1") ⇒ Object
Methods included from DefaultSql
#assign_columns_sql, #create_schema_sql, #create_table_sql, #escape_identifier, #qualify_table
Methods inherited from Webhookdb::DBAdapter
adapter, #connection, #create_schema_sql, #create_table_sql, supported_adapters_message, valid_identifier?, validate_identifier!
Instance Method Details
#add_column_sql(table, column, if_not_exists: false) ⇒ Object
45 46 47 48 49 |
# File 'lib/webhookdb/db_adapter/pg.rb', line 45 def add_column_sql(table, column, if_not_exists: false) c = self.column_create_sql(column) ifne = if_not_exists ? " IF NOT EXISTS" : "" return "ALTER TABLE #{self.qualify_table(table)} ADD COLUMN#{ifne} #{c}" end |
#column_create_sql(column) ⇒ Object
29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 |
# File 'lib/webhookdb/db_adapter/pg.rb', line 29 def column_create_sql(column) modifiers = +"" coltype = COLTYPE_MAP.fetch(column.type) if column.pk? coltype = "bigserial" if column.type == BIGINT coltype = "serial" if column.type == INTEGER modifiers << " PRIMARY KEY" elsif column.unique? modifiers << " UNIQUE NOT NULL" elsif !column.nullable? modifiers << " NOT NULL" end colname = self.escape_identifier(column.name) return "#{colname} #{coltype}#{modifiers}" end |
#create_index_sql(index, concurrently:) ⇒ Object
18 19 20 21 22 23 24 25 26 27 |
# File 'lib/webhookdb/db_adapter/pg.rb', line 18 def create_index_sql(index, concurrently:) tgts = index.targets.map { |c| self.escape_identifier(c.name) }.join(", ") uniq = index.unique ? " UNIQUE" : "" concurrent = concurrently ? " CONCURRENTLY" : "" idxname = self.escape_identifier(index.name) tblname = self.qualify_table(index.table) where = "" where = " " + Webhookdb::Customer.where(index.where).sql.delete_prefix('SELECT * FROM "customers" ') if index.where return "CREATE#{uniq} INDEX#{concurrent} IF NOT EXISTS #{idxname} ON #{tblname} (#{tgts})#{where}" end |
#identifier_quote_char ⇒ Object
14 15 16 |
# File 'lib/webhookdb/db_adapter/pg.rb', line 14 def identifier_quote_char return '"' end |
#merge_from_csv(connection, file, table, pk_col, copy_columns) ⇒ Object
51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 |
# File 'lib/webhookdb/db_adapter/pg.rb', line 51 def merge_from_csv(connection, file, table, pk_col, copy_columns) qtable = self.qualify_table(table) temptable = "#{self.escape_identifier(table.name)}_staging_#{SecureRandom.hex(4)}" connection.using do |db| db << "CREATE TEMP TABLE #{temptable} (LIKE #{qtable})" db.copy_into(temptable.to_sym, options: "DELIMITER ',', HEADER true, FORMAT csv", data: file) pkname = self.escape_identifier(pk_col.name) col_assigns = self.assign_columns_sql("src", nil, copy_columns) upsert_sql = [ <<~UPDATE, UPDATE #{qtable} AS tgt SET #{col_assigns} FROM (SELECT * FROM #{temptable} WHERE #{pkname} IN (SELECT #{pkname} FROM #{qtable})) src WHERE tgt.#{pkname} = src.#{pkname}; UPDATE "INSERT INTO #{qtable} SELECT * FROM #{temptable} WHERE #{pkname} NOT IN (SELECT #{pkname} FROM #{qtable});", ] db << upsert_sql.join("\n") end end |
#verify_connection(url, timeout: 2, statement: "SELECT 1") ⇒ Object
72 73 74 75 76 77 78 |
# File 'lib/webhookdb/db_adapter/pg.rb', line 72 def verify_connection(url, timeout: 2, statement: "SELECT 1") conn = self.connection(url) conn.using(connect_timeout: timeout) do |c| c.execute("SET statement_timeout TO #{timeout * 1000}") c.execute(statement) end end |