Class: PgSync::DataSource
- Inherits:
-
Object
- Object
- PgSync::DataSource
show all
- Includes:
- Utils
- Defined in:
- lib/pgsync/data_source.rb
Constant Summary
Constants included
from Utils
Utils::COLOR_CODES
Instance Attribute Summary collapse
Instance Method Summary
collapse
Methods included from Utils
#colorize, #confirm_tables_exist, #db_config_file, #deprecated, #escape, #first_schema, #friendly_name, #log, #monotonic_time, #output, #quote_ident, #quote_ident_full, #quote_string, #task_name, #warning
Constructor Details
#initialize(url, name:, debug:) ⇒ DataSource
Returns a new instance of DataSource.
7
8
9
10
11
|
# File 'lib/pgsync/data_source.rb', line 7
def initialize(url, name:, debug:)
@url = url
@name = name
@debug = debug
end
|
Instance Attribute Details
#url ⇒ Object
Returns the value of attribute url.
5
6
7
|
# File 'lib/pgsync/data_source.rb', line 5
def url
@url
end
|
Instance Method Details
#close ⇒ Object
120
121
122
123
124
125
|
# File 'lib/pgsync/data_source.rb', line 120
def close
if @conn
@conn.close
@conn = nil
end
end
|
#conn ⇒ Object
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
|
# File 'lib/pgsync/data_source.rb', line 103
def conn
@conn ||= begin
begin
ENV["PGCONNECT_TIMEOUT"] ||= "3"
if @url.start_with?("postgres://", "postgresql://")
config = @url
else
config = {dbname: @url}
end
@concurrent_id = concurrent_id
PG::Connection.new(config)
rescue URI::InvalidURIError
raise Error, "Invalid connection string. Make sure it works with `psql`"
end
end
end
|
#create_schema(schema) ⇒ Object
84
85
86
|
# File 'lib/pgsync/data_source.rb', line 84
def create_schema(schema)
execute("CREATE SCHEMA #{quote_ident(schema)}")
end
|
#dbname ⇒ Object
29
30
31
|
# File 'lib/pgsync/data_source.rb', line 29
def dbname
@dbname ||= conninfo[:dbname]
end
|
#execute(query, params = []) ⇒ Object
140
141
142
143
|
# File 'lib/pgsync/data_source.rb', line 140
def execute(query, params = [])
log_sql query, params
conn.exec_params(query, params).to_a
end
|
#exists? ⇒ Boolean
13
14
15
|
# File 'lib/pgsync/data_source.rb', line 13
def exists?
@url && @url.size > 0
end
|
#host ⇒ Object
21
22
23
|
# File 'lib/pgsync/data_source.rb', line 21
def host
@host ||= dedup_localhost(conninfo[:host])
end
|
#last_value(seq) ⇒ Object
63
64
65
|
# File 'lib/pgsync/data_source.rb', line 63
def last_value(seq)
execute("SELECT last_value FROM #{quote_ident_full(seq)}").first["last_value"]
end
|
#local? ⇒ Boolean
17
18
19
|
# File 'lib/pgsync/data_source.rb', line 17
def local?
!host || %w(localhost 127.0.0.1).include?(host)
end
|
#log_sql(query, params = {}) ⇒ Object
TODO log time for each statement
161
162
163
164
165
166
167
|
# File 'lib/pgsync/data_source.rb', line 161
def log_sql(query, params = {})
if @debug
message = "#{colorize("[#{@name}]", :cyan)} #{query.gsub(/\s+/, " ").strip}"
message = "#{message} #{params.inspect}" if params.any?
log message
end
end
|
#max_id(table, primary_key, sql_clause = nil) ⇒ Object
55
56
57
|
# File 'lib/pgsync/data_source.rb', line 55
def max_id(table, primary_key, sql_clause = nil)
execute("SELECT MAX(#{quote_ident(primary_key)}) FROM #{quote_ident_full(table)}#{sql_clause}").first["max"].to_i
end
|
#min_id(table, primary_key, sql_clause = nil) ⇒ Object
59
60
61
|
# File 'lib/pgsync/data_source.rb', line 59
def min_id(table, primary_key, sql_clause = nil)
execute("SELECT MIN(#{quote_ident(primary_key)}) FROM #{quote_ident_full(table)}#{sql_clause}").first["min"].to_i
end
|
#port ⇒ Object
25
26
27
|
# File 'lib/pgsync/data_source.rb', line 25
def port
@port ||= dedup_localhost(conninfo[:port])
end
|
#reconnect_if_needed ⇒ Object
reconnect for new thread or process
128
129
130
|
# File 'lib/pgsync/data_source.rb', line 128
def reconnect_if_needed
reconnect if @concurrent_id != concurrent_id
end
|
#schemas ⇒ Object
71
72
73
74
75
76
77
78
79
80
81
82
|
# File 'lib/pgsync/data_source.rb', line 71
def schemas
@schemas ||= begin
query = <<~SQL
SELECT
schema_name
FROM
information_schema.schemata
ORDER BY 1
SQL
execute(query).map { |row| row["schema_name"] }
end
end
|
#search_path ⇒ Object
132
133
134
|
# File 'lib/pgsync/data_source.rb', line 132
def search_path
@search_path ||= execute("SELECT unnest(current_schemas(true)) AS schema").map { |r| r["schema"] }
end
|
#server_version_num ⇒ Object
136
137
138
|
# File 'lib/pgsync/data_source.rb', line 136
def server_version_num
@server_version_num ||= execute("SHOW server_version_num").first["server_version_num"].to_i
end
|
#table_exists?(table) ⇒ Boolean
51
52
53
|
# File 'lib/pgsync/data_source.rb', line 51
def table_exists?(table)
table_set.include?(table)
end
|
#tables ⇒ Object
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
|
# File 'lib/pgsync/data_source.rb', line 34
def tables
@tables ||= begin
query = <<~SQL
SELECT
table_schema AS schema,
table_name AS table
FROM
information_schema.tables
WHERE
table_type = 'BASE TABLE' AND
table_schema NOT IN ('information_schema', 'pg_catalog')
ORDER BY 1, 2
SQL
execute(query).map { |row| Table.new(row["schema"], row["table"]) }
end
end
|
#transaction ⇒ Object
145
146
147
148
149
150
151
152
153
154
155
156
157
158
|
# File 'lib/pgsync/data_source.rb', line 145
def transaction
if conn.transaction_status == 0
log_sql "BEGIN"
result =
conn.transaction do
yield
end
log_sql "COMMIT"
result
else
yield
end
end
|
#triggers(table) ⇒ Object
88
89
90
91
92
93
94
95
96
97
98
99
100
101
|
# File 'lib/pgsync/data_source.rb', line 88
def triggers(table)
query = <<~SQL
SELECT
tgname AS name,
tgisinternal AS internal,
tgenabled != 'D' AS enabled,
tgconstraint != 0 AS integrity
FROM
pg_trigger
WHERE
pg_trigger.tgrelid = $1::regclass
SQL
execute(query, [quote_ident_full(table)])
end
|
#truncate(table) ⇒ Object
67
68
69
|
# File 'lib/pgsync/data_source.rb', line 67
def truncate(table)
execute("TRUNCATE #{quote_ident_full(table)} CASCADE")
end
|