Class: EventMachine::Protocols::PostgresConnection
- Inherits:
-
Connection
- Object
- Connection
- EventMachine::Protocols::PostgresConnection
- Includes:
- PostgresPR
- Defined in:
- lib/postgres_connection.rb
Instance Method Summary collapse
- #close ⇒ Object
- #closed? ⇒ Boolean
- #connect(db, user, psw = nil) ⇒ Object
-
#dispatch_conn_message(msg) ⇒ Object
Cloned and modified from the postgres-pr.
-
#dispatch_query_message(msg) ⇒ Object
Cloned and modified from the postgres-pr.
-
#exec(sql) ⇒ Object
Fibered impl for synchronous execution of SQL within EM.
-
#initialize ⇒ PostgresConnection
constructor
A new instance of PostgresConnection.
- #post_init ⇒ Object
- #query(sql) ⇒ Object
- #receive_data(data) ⇒ Object
- #unbind ⇒ Object
Constructor Details
#initialize ⇒ PostgresConnection
Returns a new instance of PostgresConnection.
33 34 35 36 37 |
# File 'lib/postgres_connection.rb', line 33 def initialize @data = "" @params = {} @connected = false end |
Instance Method Details
#close ⇒ Object
63 64 65 |
# File 'lib/postgres_connection.rb', line 63 def close close_connection end |
#closed? ⇒ Boolean
67 68 69 |
# File 'lib/postgres_connection.rb', line 67 def closed? !@connected end |
#connect(db, user, psw = nil) ⇒ Object
82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 |
# File 'lib/postgres_connection.rb', line 82 def connect(db, user, psw=nil) d = EM::DefaultDeferrable.new d.timeout 15 if @pending_query || @pending_conn d.fail "Operation already in progress" else @pending_conn = d prms = {"user"=>user, "database"=>db} @user = user if psw @password = psw #prms["password"] = psw end send_data PostgresPR::StartupMessage.new( 3 << 16, prms ).dump end d end |
#dispatch_conn_message(msg) ⇒ Object
Cloned and modified from the postgres-pr.
141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 |
# File 'lib/postgres_connection.rb', line 141 def (msg) case msg when AuthentificationClearTextPassword raise ArgumentError, "no password specified" if @password.nil? send_data PasswordMessage.new(@password).dump when AuthentificationCryptPassword raise ArgumentError, "no password specified" if @password.nil? send_data PasswordMessage.new(@password.crypt(msg.salt)).dump when AuthentificationMD5Password raise ArgumentError, "no password specified" if @password.nil? require 'digest/md5' m = Digest::MD5.hexdigest(@password + @user) m = Digest::MD5.hexdigest(m + msg.salt) m = 'md5' + m send_data PasswordMessage.new(m).dump when AuthentificationKerberosV4, AuthentificationKerberosV5, AuthentificationSCMCredential raise "unsupported authentification" when AuthentificationOk when ErrorResponse raise msg.field_values.join("\t") when NoticeResponse @notice_processor.call(msg) if @notice_processor when ParameterStatus @params[msg.key] = msg.value when BackendKeyData # TODO #p msg when ReadyForQuery # TODO: use transaction status pc,@pending_conn = @pending_conn,nil pc.succeed true else raise "unhandled message type" end end |
#dispatch_query_message(msg) ⇒ Object
Cloned and modified from the postgres-pr.
183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 |
# File 'lib/postgres_connection.rb', line 183 def (msg) case msg when DataRow @r.rows << msg.columns when CommandComplete @r.cmd_tag = msg.cmd_tag when ReadyForQuery pq,@pending_query = @pending_query,nil pq.succeed @e.size == 0, @r, @e when RowDescription @r.fields = msg.fields when CopyInResponse when CopyOutResponse when EmptyQueryResponse when ErrorResponse @e << msg.field_values[2] when NoticeResponse @notice_processor.call(msg) if @notice_processor when ParameterStatus else # TODO puts "Unknown Postgres message: #{msg}" end end |
#exec(sql) ⇒ Object
Fibered impl for synchronous execution of SQL within EM
40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 |
# File 'lib/postgres_connection.rb', line 40 def exec(sql) fiber = Fiber.current # p [fiber.object_id, self.object_id, sql] yielding = true (status, result, errors) = nil d = query(sql) d.callback do |s, r, e| (status, result, errors) = s, r, e fiber.resume end d.errback do |msg| errors = msg status = false # errback is called from the same fiber yielding = false end Fiber.yield if yielding # p [fiber.object_id, self.object_id, result] return PGresult.new(result) if status raise RuntimeError, (errors || result).inspect end |
#post_init ⇒ Object
71 72 73 |
# File 'lib/postgres_connection.rb', line 71 def post_init @connected = true end |
#query(sql) ⇒ Object
102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 |
# File 'lib/postgres_connection.rb', line 102 def query(sql) d = EM::DefaultDeferrable.new d.timeout 15 if !@connected d.fail "Not connected" elsif @pending_query || @pending_conn d.fail "Operation already in progress" else @r = PostgresPR::Connection::Result.new @e = [] @pending_query = d send_data PostgresPR::Query.dump(sql) end d end |
#receive_data(data) ⇒ Object
120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 |
# File 'lib/postgres_connection.rb', line 120 def receive_data(data) @data << data while @data.length >= 5 pktlen = @data[1...5].unpack("N").first if @data.length >= (1 + pktlen) pkt = @data.slice!(0...(1+pktlen)) m = StringIO.open( pkt, "r" ) {|io| PostgresPR::Message.read( io ) } if @pending_conn m elsif @pending_query m else raise "Unexpected message from database" end else break # very important, break out of the while end end end |
#unbind ⇒ Object
75 76 77 78 79 80 |
# File 'lib/postgres_connection.rb', line 75 def unbind @connected = false if o = (@pending_query || @pending_conn) o.succeed false, "lost connection" end end |