Module: QC::Conn
Instance Method Summary collapse
- #connect ⇒ Object
- #connection ⇒ Object
- #connection=(connection) ⇒ Object
- #db_url ⇒ Object
- #disconnect ⇒ Object
- #execute(stmt, *params) ⇒ Object
- #normalize_db_url(url) ⇒ Object
- #transaction ⇒ Object
- #transaction_idle? ⇒ Boolean
- #wait(chan) ⇒ Object
Instance Method Details
#connect ⇒ Object
68 69 70 71 72 73 74 75 76 |
# File 'lib/queue_classic/conn.rb', line 68 def connect log(:at => "establish_conn") conn = PGconn.connect(*normalize_db_url(db_url)) if conn.status != PGconn::CONNECTION_OK log(:error => conn.error) end conn.exec("SET application_name = '#{QC::APP_NAME}'") conn end |
#connection ⇒ Object
49 50 51 |
# File 'lib/queue_classic/conn.rb', line 49 def connection @connection ||= connect end |
#connection=(connection) ⇒ Object
53 54 55 56 57 58 59 60 |
# File 'lib/queue_classic/conn.rb', line 53 def connection=(connection) unless connection.is_a? PG::Connection c = connection.class err = "connection must be an instance of PG::Connection, but was #{c}" raise(ArgumentError, err) end @connection = connection end |
#db_url ⇒ Object
92 93 94 95 96 97 98 |
# File 'lib/queue_classic/conn.rb', line 92 def db_url return @db_url if @db_url url = ENV["QC_DATABASE_URL"] || ENV["DATABASE_URL"] || raise(ArgumentError, "missing QC_DATABASE_URL or DATABASE_URL") @db_url = URI.parse(url) end |
#disconnect ⇒ Object
62 63 64 65 66 |
# File 'lib/queue_classic/conn.rb', line 62 def disconnect begin connection.finish ensure @connection = nil end end |
#execute(stmt, *params) ⇒ Object
10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 |
# File 'lib/queue_classic/conn.rb', line 10 def execute(stmt, *params) @exec_mutex.synchronize do log(:at => "exec_sql", :sql => stmt.inspect) begin params = nil if params.empty? r = connection.exec(stmt, params) result = [] r.each {|t| result << t} result.length > 1 ? result : result.pop rescue PGError => e log(:error => e.inspect) disconnect raise end end end |
#normalize_db_url(url) ⇒ Object
78 79 80 81 82 83 84 85 86 87 88 89 90 |
# File 'lib/queue_classic/conn.rb', line 78 def normalize_db_url(url) host = url.host host = host.gsub(/%2F/i, '/') if host [ host, # host or percent-encoded socket path url.port || 5432, nil, '', #opts, tty url.path.gsub("/",""), # database name url.user, url.password ] end |
#transaction ⇒ Object
34 35 36 37 38 39 40 41 42 43 |
# File 'lib/queue_classic/conn.rb', line 34 def transaction begin execute("BEGIN") yield execute("COMMIT") rescue Exception execute("ROLLBACK") raise end end |
#transaction_idle? ⇒ Boolean
45 46 47 |
# File 'lib/queue_classic/conn.rb', line 45 def transaction_idle? connection.transaction_status == PGconn::PQTRANS_IDLE end |
#wait(chan) ⇒ Object
27 28 29 30 31 32 |
# File 'lib/queue_classic/conn.rb', line 27 def wait(chan) execute('LISTEN "' + chan + '"') wait_for_notify(WAIT_TIME) execute('UNLISTEN "' + chan + '"') drain_notify end |