Class: Bricolage::PostgresConnection
- Inherits:
-
Object
- Object
- Bricolage::PostgresConnection
show all
- Defined in:
- lib/bricolage/postgresconnection.rb
Defined Under Namespace
Classes: Cursor, Transaction
Class Method Summary
collapse
Instance Method Summary
collapse
-
#analyze(table) ⇒ Object
-
#cancel ⇒ Object
-
#cancel_force ⇒ Object
-
#close ⇒ Object
-
#close_force ⇒ Object
-
#closed? ⇒ Boolean
-
#drop_table(name) ⇒ Object
-
#drop_table_force(name) ⇒ Object
-
#execute_query(query, &block) ⇒ Object
(also: #query)
-
#execute_update(query) ⇒ Object
(also: #execute, #update)
-
#in_transaction? ⇒ Boolean
-
#initialize(connection, ds, logger) ⇒ PostgresConnection
constructor
A new instance of PostgresConnection.
-
#lock(table) ⇒ Object
-
#log_elapsed_time ⇒ Object
-
#log_query(query) ⇒ Object
-
#make_unique_cursor_name ⇒ Object
-
#open_cursor(query, name = nil) {|Cursor.new(name, self, @logger)| ... } ⇒ Object
-
#query_batch(query, batch_size = 5000, &block) ⇒ Object
-
#query_row(query) ⇒ Object
-
#query_rows(query) ⇒ Object
-
#query_value(query) ⇒ Object
-
#query_values(query) ⇒ Object
-
#select(table, &block) ⇒ Object
-
#source ⇒ Object
-
#transaction ⇒ Object
-
#vacuum(table) ⇒ Object
-
#vacuum_sort_only(table) ⇒ Object
Constructor Details
#initialize(connection, ds, logger) ⇒ PostgresConnection
Returns a new instance of PostgresConnection.
38
39
40
41
42
43
44
|
# File 'lib/bricolage/postgresconnection.rb', line 38
def initialize(connection, ds, logger)
@connection = connection
@ds = ds
@logger = logger
@closed = false
@connection_failed = false
end
|
Class Method Details
.install_signal_handlers ⇒ Object
10
11
12
13
14
15
|
# File 'lib/bricolage/postgresconnection.rb', line 10
def PostgresConnection.install_signal_handlers
Signal.trap(:TERM) {
$stderr.puts 'receive SIGTERM'
raise Interrupt, 'SIGTERM'
}
end
|
.open_data_source(ds) ⇒ Object
17
18
19
20
21
22
23
24
25
26
27
28
|
# File 'lib/bricolage/postgresconnection.rb', line 17
def PostgresConnection.open_data_source(ds)
conn = _open_ds(ds)
if block_given?
begin
yield conn
ensure
conn.close_force
end
else
return conn
end
end
|
Instance Method Details
#analyze(table) ⇒ Object
257
258
259
|
# File 'lib/bricolage/postgresconnection.rb', line 257
def analyze(table)
execute "analyze #{table};"
end
|
#cancel ⇒ Object
69
70
71
72
73
74
75
76
77
78
|
# File 'lib/bricolage/postgresconnection.rb', line 69
def cancel
@logger.info "cancelling PostgreSQL query..."
err = @connection.cancel
if err
@logger.error "could not cancel query: #{err}"
raise PostgreSQLException, "could not cancel query: #{err}"
else
@logger.info "successfully cancelled"
end
end
|
#cancel_force ⇒ Object
64
65
66
67
|
# File 'lib/bricolage/postgresconnection.rb', line 64
def cancel_force
cancel
rescue PostgreSQLException
end
|
#close ⇒ Object
50
51
52
53
|
# File 'lib/bricolage/postgresconnection.rb', line 50
def close
@connection.close
@closed = true
end
|
#close_force ⇒ Object
55
56
57
58
|
# File 'lib/bricolage/postgresconnection.rb', line 55
def close_force
close
rescue
end
|
#closed? ⇒ Boolean
60
61
62
|
# File 'lib/bricolage/postgresconnection.rb', line 60
def closed?
@closed
end
|
#drop_table(name) ⇒ Object
241
242
243
|
# File 'lib/bricolage/postgresconnection.rb', line 241
def drop_table(name)
execute "drop table #{name} cascade;"
end
|
#drop_table_force(name) ⇒ Object
245
246
247
|
# File 'lib/bricolage/postgresconnection.rb', line 245
def drop_table_force(name)
execute "drop table if exists #{name} cascade;"
end
|
#execute_query(query, &block) ⇒ Object
Also known as:
query
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
|
# File 'lib/bricolage/postgresconnection.rb', line 130
def execute_query(query, &block)
log_query query
rs = log_elapsed_time {
querying {
@connection.async_exec(query)
}
}
return (yield rs)
rescue PG::ConnectionBad, PG::UnableToSend => ex
@connection_failed = true
raise ConnectionError.wrap(ex)
rescue PG::Error => ex
raise PostgreSQLException.wrap(ex)
ensure
rs.clear if rs
end
|
#execute_update(query) ⇒ Object
Also known as:
execute, update
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
|
# File 'lib/bricolage/postgresconnection.rb', line 89
def execute_update(query)
log_query query
rs = log_elapsed_time {
querying {
@connection.async_exec(query)
}
}
return rs.to_a
rescue PG::ConnectionBad, PG::UnableToSend => ex
@connection_failed = true
raise ConnectionError.wrap(ex)
rescue PG::Error => ex
raise PostgreSQLException.wrap(ex)
ensure
rs.clear if rs
end
|
#in_transaction? ⇒ Boolean
155
156
157
|
# File 'lib/bricolage/postgresconnection.rb', line 155
def in_transaction?
@connection.transaction_status == PG::Constants::PQTRANS_INTRANS
end
|
#lock(table) ⇒ Object
261
262
263
|
# File 'lib/bricolage/postgresconnection.rb', line 261
def lock(table)
execute("lock #{table}")
end
|
#log_elapsed_time ⇒ Object
274
275
276
277
278
279
280
281
|
# File 'lib/bricolage/postgresconnection.rb', line 274
def log_elapsed_time
b = Time.now
return yield
ensure
e = Time.now
t = e - b
@logger.log(@ds.sql_log_level) { "#{'%.1f' % t} secs" }
end
|
#log_query(query) ⇒ Object
265
266
267
|
# File 'lib/bricolage/postgresconnection.rb', line 265
def log_query(query)
@logger.log(@ds.sql_log_level) { "[#{@ds.name}] #{mask_secrets query}" }
end
|
#make_unique_cursor_name ⇒ Object
217
218
219
220
|
# File 'lib/bricolage/postgresconnection.rb', line 217
def make_unique_cursor_name
seq = (Thread.current['bricolage_cursor_seq'] += 1)
"cur_bric_#{$$}_#{'%X' % Thread.current.object_id}_#{seq}"
end
|
#open_cursor(query, name = nil) {|Cursor.new(name, self, @logger)| ... } ⇒ Object
204
205
206
207
208
209
210
211
212
213
|
# File 'lib/bricolage/postgresconnection.rb', line 204
def open_cursor(query, name = nil, &block)
unless in_transaction?
transaction {
return open_cursor(query, &block)
}
end
name ||= make_unique_cursor_name
execute "declare #{name} cursor for #{query}"
yield Cursor.new(name, self, @logger)
end
|
#query_batch(query, batch_size = 5000, &block) ⇒ Object
149
150
151
152
153
|
# File 'lib/bricolage/postgresconnection.rb', line 149
def query_batch(query, batch_size = 5000, &block)
open_cursor(query) {|cur|
cur.each_result_set(batch_size, &block)
}
end
|
#query_row(query) ⇒ Object
122
123
124
|
# File 'lib/bricolage/postgresconnection.rb', line 122
def query_row(query)
execute_query(query) {|rs| rs.to_a.first }
end
|
#query_rows(query) ⇒ Object
126
127
128
|
# File 'lib/bricolage/postgresconnection.rb', line 126
def query_rows(query)
execute_query(query) {|rs| rs.to_a }
end
|
#query_value(query) ⇒ Object
113
114
115
116
|
# File 'lib/bricolage/postgresconnection.rb', line 113
def query_value(query)
row = query_row(query)
row ? row.values.first : nil
end
|
#query_values(query) ⇒ Object
118
119
120
|
# File 'lib/bricolage/postgresconnection.rb', line 118
def query_values(query)
execute_query(query) {|rs| rs.to_a }.flat_map {|rec| rec.values }
end
|
#select(table, &block) ⇒ Object
109
110
111
|
# File 'lib/bricolage/postgresconnection.rb', line 109
def select(table, &block)
execute_query("select * from #{table}", &block)
end
|
#source ⇒ Object
46
47
48
|
# File 'lib/bricolage/postgresconnection.rb', line 46
def source
@connection
end
|
#transaction ⇒ Object
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
|
# File 'lib/bricolage/postgresconnection.rb', line 159
def transaction
execute 'begin transaction'
txn = Transaction.new(self)
begin
yield txn
rescue
begin
if not txn.committed? and not @connection_failed
txn.abort
end
rescue => ex
@logger.error "SQL error on transaction abort: #{ex.message} (ignored)"
end
raise
ensure
txn.commit unless txn.committed?
end
end
|
#vacuum(table) ⇒ Object
249
250
251
|
# File 'lib/bricolage/postgresconnection.rb', line 249
def vacuum(table)
execute "vacuum #{table};"
end
|
#vacuum_sort_only(table) ⇒ Object
253
254
255
|
# File 'lib/bricolage/postgresconnection.rb', line 253
def vacuum_sort_only(table)
execute "vacuum sort only #{table};"
end
|