Module: QC::Conn

Extended by:
Conn
Included in:
Conn
Defined in:
lib/queue_classic/conn.rb

Instance Method Summary collapse

Instance Method Details

#connectObject



86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
# File 'lib/queue_classic/conn.rb', line 86

def connect
  log(:at => "establish_conn")
  conn = PGconn.connect(
    db_url.host.gsub(/%2F/i, '/'), # host or percent-encoded socket path
    db_url.port || 5432,
    nil, '', #opts, tty
    db_url.path.gsub("/",""), # database name
    db_url.user,
    db_url.password
  )
  if conn.status != PGconn::CONNECTION_OK
    log(:error => conn.error)
  end
  conn
end

#connectionObject



67
68
69
# File 'lib/queue_classic/conn.rb', line 67

def connection
  @connection ||= connect
end

#connection=(connection) ⇒ Object



71
72
73
74
75
76
77
78
# File 'lib/queue_classic/conn.rb', line 71

def connection=(connection)
  unless connection.instance_of? PG::Connection
    c = connection.class
    err = "connection must be an instance of PG::Connection, but was #{c}"
    raise(ArgumentError, err)
  end
  @connection = connection
end

#db_urlObject



102
103
104
105
106
107
108
# File 'lib/queue_classic/conn.rb', line 102

def db_url
  return @db_url if @db_url
  url = ENV["QC_DATABASE_URL"] ||
        ENV["DATABASE_URL"]    ||
        raise(ArgumentError, "missing QC_DATABASE_URL or DATABASE_URL")
  @db_url = URI.parse(url)
end

#disconnectObject



80
81
82
83
84
# File 'lib/queue_classic/conn.rb', line 80

def disconnect
  begin connection.finish
  ensure @connection = nil
  end
end

#drain_notifyObject



40
41
42
43
44
# File 'lib/queue_classic/conn.rb', line 40

def drain_notify
  until connection.notifies.nil?
    log(:at => "drain_notifications")
  end
end

#execute(stmt, *params) ⇒ Object



8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
# File 'lib/queue_classic/conn.rb', line 8

def execute(stmt, *params)
  @exec_mutex.synchronize do
    log(:at => "exec_sql", :sql => stmt.inspect)
    begin
      params = nil if params.empty?
      r = connection.exec(stmt, params)
      result = []
      r.each {|t| result << t}
      result.length > 1 ? result : result.pop
    rescue PGError => e
      log(:error => e.inspect)
      disconnect
      raise
    end
  end
end

#listen(chan) ⇒ Object



30
31
32
33
# File 'lib/queue_classic/conn.rb', line 30

def listen(chan)
  log(:at => "LISTEN")
  execute('LISTEN "' + chan + '"') #quotes matter
end

#log(msg) ⇒ Object



110
111
112
# File 'lib/queue_classic/conn.rb', line 110

def log(msg)
  QC.log(msg)
end

#notify(chan) ⇒ Object



25
26
27
28
# File 'lib/queue_classic/conn.rb', line 25

def notify(chan)
  log(:at => "NOTIFY")
  execute('NOTIFY "' + chan + '"') #quotes matter
end

#transactionObject



52
53
54
55
56
57
58
59
60
61
# File 'lib/queue_classic/conn.rb', line 52

def transaction
  begin
    execute("BEGIN")
    yield
    execute("COMMIT")
  rescue Exception
    execute("ROLLBACK")
    raise
  end
end

#transaction_idle?Boolean

Returns:

  • (Boolean)


63
64
65
# File 'lib/queue_classic/conn.rb', line 63

def transaction_idle?
  connection.transaction_status == PGconn::PQTRANS_IDLE
end

#unlisten(chan) ⇒ Object



35
36
37
38
# File 'lib/queue_classic/conn.rb', line 35

def unlisten(chan)
  log(:at => "UNLISTEN")
  execute('UNLISTEN "' + chan + '"') #quotes matter
end

#wait_for_notify(t) ⇒ Object



46
47
48
49
50
# File 'lib/queue_classic/conn.rb', line 46

def wait_for_notify(t)
  connection.wait_for_notify(t) do |event, pid, msg|
    log(:at => "received_notification")
  end
end