Module: QC::Conn

Extended by:
Conn
Included in:
Conn
Defined in:
lib/queue_classic/conn.rb

Instance Method Summary collapse

Instance Method Details

#connectObject



68
69
70
71
72
73
74
75
76
# File 'lib/queue_classic/conn.rb', line 68

def connect
  log(:at => "establish_conn")
  conn = PGconn.connect(*normalize_db_url(db_url))
  if conn.status != PGconn::CONNECTION_OK
    log(:error => conn.error)
  end
  conn.exec("SET application_name = '#{QC::APP_NAME}'")
  conn
end

#connectionObject



49
50
51
# File 'lib/queue_classic/conn.rb', line 49

def connection
  @connection ||= connect
end

#connection=(connection) ⇒ Object



53
54
55
56
57
58
59
60
# File 'lib/queue_classic/conn.rb', line 53

def connection=(connection)
  unless connection.is_a? PG::Connection
    c = connection.class
    err = "connection must be an instance of PG::Connection, but was #{c}"
    raise(ArgumentError, err)
  end
  @connection = connection
end

#db_urlObject



92
93
94
95
96
97
98
# File 'lib/queue_classic/conn.rb', line 92

def db_url
  return @db_url if @db_url
  url = ENV["QC_DATABASE_URL"] ||
        ENV["DATABASE_URL"]    ||
        raise(ArgumentError, "missing QC_DATABASE_URL or DATABASE_URL")
  @db_url = URI.parse(url)
end

#disconnectObject



62
63
64
65
66
# File 'lib/queue_classic/conn.rb', line 62

def disconnect
  begin connection.finish
  ensure @connection = nil
  end
end

#execute(stmt, *params) ⇒ Object



10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
# File 'lib/queue_classic/conn.rb', line 10

def execute(stmt, *params)
  @exec_mutex.synchronize do
    log(:at => "exec_sql", :sql => stmt.inspect)
    begin
      params = nil if params.empty?
      r = connection.exec(stmt, params)
      result = []
      r.each {|t| result << t}
      result.length > 1 ? result : result.pop
    rescue PGError => e
      log(:error => e.inspect)
      disconnect
      raise
    end
  end
end

#normalize_db_url(url) ⇒ Object



78
79
80
81
82
83
84
85
86
87
88
89
90
# File 'lib/queue_classic/conn.rb', line 78

def normalize_db_url(url)
  host = url.host
  host = host.gsub(/%2F/i, '/') if host

  [
   host, # host or percent-encoded socket path
   url.port || 5432,
   nil, '', #opts, tty
   url.path.gsub("/",""), # database name
   url.user,
   url.password
  ]
end

#transactionObject



34
35
36
37
38
39
40
41
42
43
# File 'lib/queue_classic/conn.rb', line 34

def transaction
  begin
    execute("BEGIN")
    yield
    execute("COMMIT")
  rescue Exception
    execute("ROLLBACK")
    raise
  end
end

#transaction_idle?Boolean

Returns:

  • (Boolean)


45
46
47
# File 'lib/queue_classic/conn.rb', line 45

def transaction_idle?
  connection.transaction_status == PGconn::PQTRANS_IDLE
end

#wait(chan) ⇒ Object



27
28
29
30
31
32
# File 'lib/queue_classic/conn.rb', line 27

def wait(chan)
  execute('LISTEN "' + chan + '"')
  wait_for_notify(WAIT_TIME)
  execute('UNLISTEN "' + chan + '"')
  drain_notify
end