Class: EventMachine::PostgresConnection
- Inherits:
-
Connection
- Object
- Connection
- EventMachine::PostgresConnection
- Defined in:
- lib/em-postgres/connection.rb
Constant Summary collapse
- MAX_RETRIES_ON_DEADLOCKS =
10
- DisconnectErrors =
[ 'query: not connected', 'Postgres server has gone away', 'Lost connection to Postgres server during query' ]
Instance Attribute Summary collapse
-
#connected ⇒ Object
readonly
Returns the value of attribute connected.
-
#opts ⇒ Object
(also: #settings)
readonly
Returns the value of attribute opts.
-
#processing ⇒ Object
readonly
Returns the value of attribute processing.
Class Method Summary collapse
- .connect(opts) ⇒ Object
-
.connect_socket(opts) ⇒ Object
stolen from sequel.
Instance Method Summary collapse
- #close ⇒ Object
- #execute(sql, params = nil, cblk = nil, eblk = nil, retries = 0) ⇒ Object
-
#initialize(postgres, opts, conn) ⇒ PostgresConnection
constructor
A new instance of PostgresConnection.
-
#method_missing(method, *args, &blk) ⇒ Object
act like the pg driver.
- #notify_readable ⇒ Object
- #reconnect ⇒ Object
- #unbind ⇒ Object
Constructor Details
#initialize(postgres, opts, conn) ⇒ PostgresConnection
Returns a new instance of PostgresConnection.
22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 |
# File 'lib/em-postgres/connection.rb', line 22 def initialize(postgres,opts,conn) #def initialize(postgres,opts) begin @conn = conn @postgres = postgres @fd = postgres.socket @opts = opts @current = nil @queue = [] @processing = false @connected = true self.notify_readable = true EM.add_timer(0){ next_query } rescue => e puts e.inspect end end |
Dynamic Method Handling
This class handles dynamic methods through the method_missing method
#method_missing(method, *args, &blk) ⇒ Object
act like the pg driver
203 204 205 206 207 208 209 |
# File 'lib/em-postgres/connection.rb', line 203 def method_missing(method, *args, &blk) if @postgres.respond_to? method @postgres.send(method, *args, &blk) else super end end |
Instance Attribute Details
#connected ⇒ Object (readonly)
Returns the value of attribute connected.
11 12 13 |
# File 'lib/em-postgres/connection.rb', line 11 def connected @connected end |
#opts ⇒ Object (readonly) Also known as: settings
Returns the value of attribute opts.
11 12 13 |
# File 'lib/em-postgres/connection.rb', line 11 def opts @opts end |
#processing ⇒ Object (readonly)
Returns the value of attribute processing.
11 12 13 |
# File 'lib/em-postgres/connection.rb', line 11 def processing @processing end |
Class Method Details
.connect(opts) ⇒ Object
42 43 44 45 46 47 48 49 50 51 |
# File 'lib/em-postgres/connection.rb', line 42 def self.connect(opts) if conn = connect_socket(opts) #debug [:connect, conn.socket, opts] EM.watch(conn.socket, EventMachine::PostgresConnection,conn,opts) else # invokes :errback callback in opts before firing again debug [:reconnect] EM.add_timer(5) { connect opts } end end |
.connect_socket(opts) ⇒ Object
stolen from sequel
54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 |
# File 'lib/em-postgres/connection.rb', line 54 def self.connect_socket(opts) begin conn = PGconn.connect( opts[:host], (opts[:port]), #TODO deal with host and port nil,nil, opts[:database], opts[:user], opts[:password] ) # set encoding _before_ connecting if encoding = opts[:encoding] || opts[:charset] if conn.respond_to?(:set_client_encoding) conn.set_client_encoding(encoding) else conn.async_exec("set client_encoding to '#{encoding}'") end end conn rescue Exception => e puts "#{e} exception" if cb = opts[:errback] cb.call(e) nil else raise e end end end |
Instance Method Details
#close ⇒ Object
211 212 213 214 215 216 |
# File 'lib/em-postgres/connection.rb', line 211 def close return unless @connected detach @postgres.finish @connected = false end |
#execute(sql, params = nil, cblk = nil, eblk = nil, retries = 0) ⇒ Object
174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 |
# File 'lib/em-postgres/connection.rb', line 174 def execute(sql,params=nil, cblk = nil, eblk = nil, retries = 0) begin if not @processing or not @connected #if !@processing || !@connected @processing = true if sql =~ /\s+/ @postgres.send_query(sql,params) else @postgres.send_query_prepared(sql,params) end else @queue << [sql,params, cblk, eblk, retries] return end rescue Exception => e puts "error in execute #{e}" if DisconnectErrors.include? e. @queue << [sql,params, cblk, eblk, retries] return #close else raise e end end @current = [sql,params, cblk, eblk, retries] end |
#notify_readable ⇒ Object
84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 |
# File 'lib/em-postgres/connection.rb', line 84 def notify_readable if item = @current sql,params, cblk, eblk, retries = item #results = [] #result = nil #@postgres.get_result{|r| result = r} #@postgres.get_result #TODO remove this, I can't process anymore code without this. result = nil loop do # Fetch the next result. If there isn't one, the query is # finished item = @postgres.get_result if item result = item else break end #puts "\n\nQuery result:\n%p\n" % [ result.values ] end unless @postgres. == "" #TODO this is wrong eb = (eblk || @opts[:on_error]) eb.call(result) if eb result.clear #reconnect @processing = false #@current = nil return next_query end # kick off next query in the background # as we process the current results @current = nil @processing = false cblk.call(result) if cblk result.clear next_query else return close end rescue Exception => e puts "error #{e}" if e. =~ /Deadlock/ and retries < MAX_RETRIES_ON_DEADLOCKS @queue << [sql, cblk, eblk, retries + 1] @processing = false next_query elsif DisconnectErrors.include? e. @queue << [sql,params, cblk, eblk, retries + 1] return #close elsif cb = (eblk || @opts[:on_error]) cb.call(e) @processing = false next_query else raise e end end |
#reconnect ⇒ Object
159 160 161 162 163 164 165 166 167 168 169 170 171 172 |
# File 'lib/em-postgres/connection.rb', line 159 def reconnect @processing = false @postgres = @conn.connect_socket(@opts) @fd = @postgres.socket @signature = EM.attach_fd(@postgres.socket, true) EM.set_notify_readable(@signature, true) EM.instance_variable_get('@conns')[@signature] = self @connected = true next_query rescue Exception => e EM.add_timer(1) { reconnect } end |
#unbind ⇒ Object
145 146 147 148 149 150 151 152 153 154 155 156 157 |
# File 'lib/em-postgres/connection.rb', line 145 def unbind # wait for the next tick until the current fd is removed completely from the reactor # # in certain cases the new FD# (@mysql.socket) is the same as the old, since FDs are re-used # without next_tick in these cases, unbind will get fired on the newly attached signature as well # # do _NOT_ use EM.next_tick here. if a bunch of sockets disconnect at the same time, we want # reconnects to happen after all the unbinds have been processed #@connected = false EM.next_tick { reconnect } end |