Module: ActiveRecord::ConnectionAdapters::Materialize::Schema::SourceStatements
- Included in:
- ActiveRecord::ConnectionAdapters::MaterializeAdapter
- Defined in:
- lib/active_record/connection_adapters/materialize/schema/source_statements.rb
Instance Method Summary collapse
- #create_source(source_name, publication:, source_type: :postgres, materialized: true, connection_params: nil) ⇒ Object
- #drop_source(source_name) ⇒ Object
-
#select_database_config(connection_params) ⇒ Object
“host=postgresdb port=5432 user=postgres dbname=source_database”.
- #source_exists?(source_name) ⇒ Boolean
-
#source_options(source_name) ⇒ Object
Get source options from an existing source.
- #sources ⇒ Object
Instance Method Details
#create_source(source_name, publication:, source_type: :postgres, materialized: true, connection_params: nil) ⇒ Object
42 43 44 45 46 47 48 49 50 51 52 53 54 55 |
# File 'lib/active_record/connection_adapters/materialize/schema/source_statements.rb', line 42 def create_source(source_name, publication:, source_type: :postgres, materialized: true, connection_params: nil) materialized_statement = materialized ? 'MATERIALIZED ' : '' connection_string = select_database_config(connection_params).map { |k, v| [k, v].join("=") }.join(" ") case source_type when :postgres execute <<-SQL.squish CREATE #{materialized_statement}SOURCE #{quote_schema_name(source_name)} FROM POSTGRES CONNECTION #{quote(connection_string)} PUBLICATION #{quote(publication)} SQL else raise "Source type #{source_type} is currently unsupported for Materialized sources", NotImplementedError end end |
#drop_source(source_name) ⇒ Object
57 58 |
# File 'lib/active_record/connection_adapters/materialize/schema/source_statements.rb', line 57 def drop_source(source_name) end |
#select_database_config(connection_params) ⇒ Object
“host=postgresdb port=5432 user=postgres dbname=source_database”
32 33 34 35 36 37 38 39 40 |
# File 'lib/active_record/connection_adapters/materialize/schema/source_statements.rb', line 32 def select_database_config(connection_params) { host: connection_params['host'], port: connection_params['port'], user: connection_params['username'], dbname: connection_params['database'], password: connection_params['password'] }.compact end |
#source_exists?(source_name) ⇒ Boolean
60 61 |
# File 'lib/active_record/connection_adapters/materialize/schema/source_statements.rb', line 60 def source_exists?(source_name) end |
#source_options(source_name) ⇒ Object
Get source options from an existing source
14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 |
# File 'lib/active_record/connection_adapters/materialize/schema/source_statements.rb', line 14 def (source_name) name_ref, statement = query("SHOW CREATE SOURCE #{quote_table_name(source_name)}", "SCHEMA").first database_name, schema_name, publication_name = name_ref.split(".") materialized = !!/CREATE\sMATERIALIZED/.match(statement) _, source_type, _ = /FROM\s(POSTGRES|KAFKA\sBROKER)\sCONNECTION/.match(statement).to_s.split " " source_type = source_type.to_s.downcase.to_sym { database_name: database_name, schema_name: schema_name, source_name: source_name, source_type: source_type, publication: publication_name, materialized: materialized || source_type == :postgres } end |
#sources ⇒ Object
9 10 11 |
# File 'lib/active_record/connection_adapters/materialize/schema/source_statements.rb', line 9 def sources query_values(data_source_sql(type: "SOURCE"), "SCHEMA") end |