Class: PgSync::DataSource

Inherits:
Object
  • Object
show all
Includes:
Utils
Defined in:
lib/pgsync/data_source.rb

Constant Summary

Constants included from Utils

Utils::COLOR_CODES

Instance Attribute Summary collapse

Instance Method Summary collapse

Methods included from Utils

#colorize, #confirm_tables_exist, #db_config_file, #deprecated, #escape, #first_schema, #friendly_name, #log, #monotonic_time, #output, #quote_ident, #quote_ident_full, #quote_string, #task_name, #warning

Constructor Details

#initialize(url, name:, debug:) ⇒ DataSource

Returns a new instance of DataSource.



7
8
9
10
11
# File 'lib/pgsync/data_source.rb', line 7

def initialize(url, name:, debug:)
  @url = url
  @name = name
  @debug = debug
end

Instance Attribute Details

#urlObject (readonly)

Returns the value of attribute url.



5
6
7
# File 'lib/pgsync/data_source.rb', line 5

def url
  @url
end

Instance Method Details

#closeObject



120
121
122
123
124
125
# File 'lib/pgsync/data_source.rb', line 120

def close
  if @conn
    @conn.close
    @conn = nil
  end
end

#connObject



103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
# File 'lib/pgsync/data_source.rb', line 103

def conn
  @conn ||= begin
    begin
      ENV["PGCONNECT_TIMEOUT"] ||= "3"
      if @url.start_with?("postgres://", "postgresql://")
        config = @url
      else
        config = {dbname: @url}
      end
      @concurrent_id = concurrent_id
      PG::Connection.new(config)
    rescue URI::InvalidURIError
      raise Error, "Invalid connection string. Make sure it works with `psql`"
    end
  end
end

#create_schema(schema) ⇒ Object



84
85
86
# File 'lib/pgsync/data_source.rb', line 84

def create_schema(schema)
  execute("CREATE SCHEMA #{quote_ident(schema)}")
end

#dbnameObject



29
30
31
# File 'lib/pgsync/data_source.rb', line 29

def dbname
  @dbname ||= conninfo[:dbname]
end

#execute(query, params = []) ⇒ Object



140
141
142
143
# File 'lib/pgsync/data_source.rb', line 140

def execute(query, params = [])
  log_sql query, params
  conn.exec_params(query, params).to_a
end

#exists?Boolean

Returns:

  • (Boolean)


13
14
15
# File 'lib/pgsync/data_source.rb', line 13

def exists?
  @url && @url.size > 0
end

#hostObject



21
22
23
# File 'lib/pgsync/data_source.rb', line 21

def host
  @host ||= dedup_localhost(conninfo[:host])
end

#last_value(seq) ⇒ Object



63
64
65
# File 'lib/pgsync/data_source.rb', line 63

def last_value(seq)
  execute("SELECT last_value FROM #{quote_ident_full(seq)}").first["last_value"]
end

#local?Boolean

Returns:

  • (Boolean)


17
18
19
# File 'lib/pgsync/data_source.rb', line 17

def local?
  !host || %w(localhost 127.0.0.1).include?(host)
end

#log_sql(query, params = {}) ⇒ Object

TODO log time for each statement



161
162
163
164
165
166
167
# File 'lib/pgsync/data_source.rb', line 161

def log_sql(query, params = {})
  if @debug
    message = "#{colorize("[#{@name}]", :cyan)} #{query.gsub(/\s+/, " ").strip}"
    message = "#{message} #{params.inspect}" if params.any?
    log message
  end
end

#max_id(table, primary_key, sql_clause = nil) ⇒ Object



55
56
57
# File 'lib/pgsync/data_source.rb', line 55

def max_id(table, primary_key, sql_clause = nil)
  execute("SELECT MAX(#{quote_ident(primary_key)}) FROM #{quote_ident_full(table)}#{sql_clause}").first["max"].to_i
end

#min_id(table, primary_key, sql_clause = nil) ⇒ Object



59
60
61
# File 'lib/pgsync/data_source.rb', line 59

def min_id(table, primary_key, sql_clause = nil)
  execute("SELECT MIN(#{quote_ident(primary_key)}) FROM #{quote_ident_full(table)}#{sql_clause}").first["min"].to_i
end

#portObject



25
26
27
# File 'lib/pgsync/data_source.rb', line 25

def port
  @port ||= dedup_localhost(conninfo[:port])
end

#reconnect_if_neededObject

reconnect for new thread or process



128
129
130
# File 'lib/pgsync/data_source.rb', line 128

def reconnect_if_needed
  reconnect if @concurrent_id != concurrent_id
end

#schemasObject



71
72
73
74
75
76
77
78
79
80
81
82
# File 'lib/pgsync/data_source.rb', line 71

def schemas
  @schemas ||= begin
    query = <<~SQL
      SELECT
        schema_name
      FROM
        information_schema.schemata
      ORDER BY 1
    SQL
    execute(query).map { |row| row["schema_name"] }
  end
end

#search_pathObject



132
133
134
# File 'lib/pgsync/data_source.rb', line 132

def search_path
  @search_path ||= execute("SELECT unnest(current_schemas(true)) AS schema").map { |r| r["schema"] }
end

#server_version_numObject



136
137
138
# File 'lib/pgsync/data_source.rb', line 136

def server_version_num
  @server_version_num ||= execute("SHOW server_version_num").first["server_version_num"].to_i
end

#table_exists?(table) ⇒ Boolean

Returns:

  • (Boolean)


51
52
53
# File 'lib/pgsync/data_source.rb', line 51

def table_exists?(table)
  table_set.include?(table)
end

#tablesObject

gets visible tables



34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
# File 'lib/pgsync/data_source.rb', line 34

def tables
  @tables ||= begin
    query = <<~SQL
      SELECT
        table_schema AS schema,
        table_name AS table
      FROM
        information_schema.tables
      WHERE
        table_type = 'BASE TABLE' AND
        table_schema NOT IN ('information_schema', 'pg_catalog')
      ORDER BY 1, 2
    SQL
    execute(query).map { |row| Table.new(row["schema"], row["table"]) }
  end
end

#transactionObject



145
146
147
148
149
150
151
152
153
154
155
156
157
158
# File 'lib/pgsync/data_source.rb', line 145

def transaction
  if conn.transaction_status == 0
    # not currently in transaction
    log_sql "BEGIN"
    result =
      conn.transaction do
        yield
      end
    log_sql "COMMIT"
    result
  else
    yield
  end
end

#triggers(table) ⇒ Object



88
89
90
91
92
93
94
95
96
97
98
99
100
101
# File 'lib/pgsync/data_source.rb', line 88

def triggers(table)
  query = <<~SQL
    SELECT
      tgname AS name,
      tgisinternal AS internal,
      tgenabled != 'D' AS enabled,
      tgconstraint != 0 AS integrity
    FROM
      pg_trigger
    WHERE
      pg_trigger.tgrelid = $1::regclass
  SQL
  execute(query, [quote_ident_full(table)])
end

#truncate(table) ⇒ Object



67
68
69
# File 'lib/pgsync/data_source.rb', line 67

def truncate(table)
  execute("TRUNCATE #{quote_ident_full(table)} CASCADE")
end