Class: Bricolage::PostgresConnection

Inherits:
Object
  • Object
show all
Defined in:
lib/bricolage/postgresconnection.rb

Defined Under Namespace

Classes: Cursor, Transaction

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(connection, ds, logger) ⇒ PostgresConnection

Returns a new instance of PostgresConnection.



38
39
40
41
42
43
44
# File 'lib/bricolage/postgresconnection.rb', line 38

def initialize(connection, ds, logger)
  @connection = connection
  @ds = ds
  @logger = logger
  @closed = false
  @connection_failed = false
end

Class Method Details

.install_signal_handlersObject



10
11
12
13
14
15
# File 'lib/bricolage/postgresconnection.rb', line 10

def PostgresConnection.install_signal_handlers
  Signal.trap(:TERM) {
    $stderr.puts 'receive SIGTERM'
    raise Interrupt, 'SIGTERM'
  }
end

.open_data_source(ds) ⇒ Object



17
18
19
20
21
22
23
24
25
26
27
28
# File 'lib/bricolage/postgresconnection.rb', line 17

def PostgresConnection.open_data_source(ds)
  conn = _open_ds(ds)
  if block_given?
    begin
      yield conn
    ensure
      conn.close_force
    end
  else
    return conn
  end
end

Instance Method Details

#analyze(table) ⇒ Object



257
258
259
# File 'lib/bricolage/postgresconnection.rb', line 257

def analyze(table)
  execute "analyze #{table};"
end

#cancelObject



69
70
71
72
73
74
75
76
77
78
# File 'lib/bricolage/postgresconnection.rb', line 69

def cancel
  @logger.info "cancelling PostgreSQL query..."
  err = @connection.cancel
  if err
    @logger.error "could not cancel query: #{err}"
    raise PostgreSQLException, "could not cancel query: #{err}"
  else
    @logger.info "successfully cancelled"
  end
end

#cancel_forceObject



64
65
66
67
# File 'lib/bricolage/postgresconnection.rb', line 64

def cancel_force
  cancel
rescue PostgreSQLException
end

#closeObject



50
51
52
53
# File 'lib/bricolage/postgresconnection.rb', line 50

def close
  @connection.close
  @closed = true
end

#close_forceObject



55
56
57
58
# File 'lib/bricolage/postgresconnection.rb', line 55

def close_force
  close
rescue
end

#closed?Boolean

Returns:

  • (Boolean)


60
61
62
# File 'lib/bricolage/postgresconnection.rb', line 60

def closed?
  @closed
end

#drop_table(name) ⇒ Object



241
242
243
# File 'lib/bricolage/postgresconnection.rb', line 241

def drop_table(name)
  execute "drop table #{name} cascade;"
end

#drop_table_force(name) ⇒ Object



245
246
247
# File 'lib/bricolage/postgresconnection.rb', line 245

def drop_table_force(name)
  execute "drop table if exists #{name} cascade;"
end

#execute_query(query, &block) ⇒ Object Also known as: query



130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
# File 'lib/bricolage/postgresconnection.rb', line 130

def execute_query(query, &block)
  log_query query
  rs = log_elapsed_time {
    querying {
      @connection.async_exec(query)
    }
  }
  return (yield rs)
rescue PG::ConnectionBad, PG::UnableToSend => ex
  @connection_failed = true
  raise ConnectionError.wrap(ex)
rescue PG::Error => ex
  raise PostgreSQLException.wrap(ex)
ensure
  rs.clear if rs
end

#execute_update(query) ⇒ Object Also known as: execute, update



89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
# File 'lib/bricolage/postgresconnection.rb', line 89

def execute_update(query)
  log_query query
  rs = log_elapsed_time {
    querying {
      @connection.async_exec(query)
    }
  }
  return rs.to_a
rescue PG::ConnectionBad, PG::UnableToSend => ex
  @connection_failed = true
  raise ConnectionError.wrap(ex)
rescue PG::Error => ex
  raise PostgreSQLException.wrap(ex)
ensure
  rs.clear if rs
end

#in_transaction?Boolean

Returns:

  • (Boolean)


155
156
157
# File 'lib/bricolage/postgresconnection.rb', line 155

def in_transaction?
  @connection.transaction_status == PG::Constants::PQTRANS_INTRANS
end

#lock(table) ⇒ Object



261
262
263
# File 'lib/bricolage/postgresconnection.rb', line 261

def lock(table)
  execute("lock #{table}")
end

#log_elapsed_timeObject



274
275
276
277
278
279
280
281
# File 'lib/bricolage/postgresconnection.rb', line 274

def log_elapsed_time
  b = Time.now
  return yield
ensure
  e = Time.now
  t = e - b
  @logger.log(@ds.sql_log_level) { "#{'%.1f' % t} secs" }
end

#log_query(query) ⇒ Object



265
266
267
# File 'lib/bricolage/postgresconnection.rb', line 265

def log_query(query)
  @logger.log(@ds.sql_log_level) { "[#{@ds.name}] #{mask_secrets query}" }
end

#make_unique_cursor_nameObject



217
218
219
220
# File 'lib/bricolage/postgresconnection.rb', line 217

def make_unique_cursor_name
  seq = (Thread.current['bricolage_cursor_seq'] += 1)
  "cur_bric_#{$$}_#{'%X' % Thread.current.object_id}_#{seq}"
end

#open_cursor(query, name = nil) {|Cursor.new(name, self, @logger)| ... } ⇒ Object

Yields:

  • (Cursor.new(name, self, @logger))


204
205
206
207
208
209
210
211
212
213
# File 'lib/bricolage/postgresconnection.rb', line 204

def open_cursor(query, name = nil, &block)
  unless in_transaction?
    transaction {
      return open_cursor(query, &block)
    }
  end
  name ||= make_unique_cursor_name
  execute "declare #{name} cursor for #{query}"
  yield Cursor.new(name, self, @logger)
end

#query_batch(query, batch_size = 5000, &block) ⇒ Object



149
150
151
152
153
# File 'lib/bricolage/postgresconnection.rb', line 149

def query_batch(query, batch_size = 5000, &block)
  open_cursor(query) {|cur|
    cur.each_result_set(batch_size, &block)
  }
end

#query_row(query) ⇒ Object



122
123
124
# File 'lib/bricolage/postgresconnection.rb', line 122

def query_row(query)
  execute_query(query) {|rs| rs.to_a.first }
end

#query_rows(query) ⇒ Object



126
127
128
# File 'lib/bricolage/postgresconnection.rb', line 126

def query_rows(query)
  execute_query(query) {|rs| rs.to_a }
end

#query_value(query) ⇒ Object



113
114
115
116
# File 'lib/bricolage/postgresconnection.rb', line 113

def query_value(query)
  row = query_row(query)
  row ? row.values.first : nil
end

#query_values(query) ⇒ Object



118
119
120
# File 'lib/bricolage/postgresconnection.rb', line 118

def query_values(query)
  execute_query(query) {|rs| rs.to_a }.flat_map {|rec| rec.values }
end

#select(table, &block) ⇒ Object



109
110
111
# File 'lib/bricolage/postgresconnection.rb', line 109

def select(table, &block)
  execute_query("select * from #{table}", &block)
end

#sourceObject



46
47
48
# File 'lib/bricolage/postgresconnection.rb', line 46

def source
  @connection
end

#transactionObject



159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
# File 'lib/bricolage/postgresconnection.rb', line 159

def transaction
  execute 'begin transaction'
  txn = Transaction.new(self)
  begin
    yield txn
  rescue
    begin
      if not txn.committed? and not @connection_failed
        txn.abort
      end
    rescue => ex
      @logger.error "SQL error on transaction abort: #{ex.message} (ignored)"
    end
    raise
  ensure
    txn.commit unless txn.committed?
  end
end

#vacuum(table) ⇒ Object



249
250
251
# File 'lib/bricolage/postgresconnection.rb', line 249

def vacuum(table)
  execute "vacuum #{table};"
end

#vacuum_sort_only(table) ⇒ Object



253
254
255
# File 'lib/bricolage/postgresconnection.rb', line 253

def vacuum_sort_only(table)
  execute "vacuum sort only #{table};"
end