Module: ActiveRecord::ConnectionAdapters::Materialize::Schema::SourceStatements

Included in:
ActiveRecord::ConnectionAdapters::MaterializeAdapter
Defined in:
lib/active_record/connection_adapters/materialize/schema/source_statements.rb

Instance Method Summary collapse

Instance Method Details

#create_source(source_name, publication:, source_type: :postgres, materialized: true, connection_params: nil) ⇒ Object



42
43
44
45
46
47
48
49
50
51
52
53
54
55
# File 'lib/active_record/connection_adapters/materialize/schema/source_statements.rb', line 42

def create_source(source_name, publication:, source_type: :postgres, materialized: true, connection_params: nil)
  materialized_statement = materialized ? 'MATERIALIZED ' : ''
  connection_string = select_database_config(connection_params).map { |k, v| [k, v].join("=") }.join(" ")
  case source_type
  when :postgres
    execute <<-SQL.squish
      CREATE #{materialized_statement}SOURCE #{quote_schema_name(source_name)} FROM POSTGRES
        CONNECTION #{quote(connection_string)}
        PUBLICATION #{quote(publication)}
    SQL
  else
    raise "Source type #{source_type} is currently unsupported for Materialized sources", NotImplementedError
  end
end

#drop_source(source_name) ⇒ Object



57
58
# File 'lib/active_record/connection_adapters/materialize/schema/source_statements.rb', line 57

def drop_source(source_name)
end

#select_database_config(connection_params) ⇒ Object

“host=postgresdb port=5432 user=postgres dbname=source_database”



32
33
34
35
36
37
38
39
40
# File 'lib/active_record/connection_adapters/materialize/schema/source_statements.rb', line 32

def select_database_config(connection_params)
  {
    host: connection_params['host'],
    port: connection_params['port'],
    user: connection_params['username'],
    dbname: connection_params['database'],
    password: connection_params['password']
  }.compact
end

#source_exists?(source_name) ⇒ Boolean

Returns:

  • (Boolean)


60
61
# File 'lib/active_record/connection_adapters/materialize/schema/source_statements.rb', line 60

def source_exists?(source_name)
end

#source_options(source_name) ⇒ Object

Get source options from an existing source



14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
# File 'lib/active_record/connection_adapters/materialize/schema/source_statements.rb', line 14

def source_options(source_name)
  name_ref, statement = query("SHOW CREATE SOURCE #{quote_table_name(source_name)}", "SCHEMA").first
  database_name, schema_name, publication_name = name_ref.split(".")
  materialized = !!/CREATE\sMATERIALIZED/.match(statement)
   _, source_type, _ = /FROM\s(POSTGRES|KAFKA\sBROKER)\sCONNECTION/.match(statement).to_s.split " "
   source_type = source_type.to_s.downcase.to_sym

  {
    database_name: database_name,
    schema_name: schema_name,
    source_name: source_name,
    source_type: source_type,
    publication: publication_name,
    materialized: materialized || source_type == :postgres
  }
end

#sourcesObject



9
10
11
# File 'lib/active_record/connection_adapters/materialize/schema/source_statements.rb', line 9

def sources
  query_values(data_source_sql(type: "SOURCE"), "SCHEMA")
end