Class: EventMachine::Postgres
- Inherits:
-
Object
- Object
- EventMachine::Postgres
- Defined in:
- lib/em-postgres/postgres.rb
Instance Attribute Summary collapse
-
#connection ⇒ Object
readonly
self::Postgres = ::Postgres unless defined? self::Postgres.
Instance Method Summary collapse
- #close ⇒ Object
- #connect(opts) ⇒ Object
-
#connect_socket(opts) ⇒ Object
stolen from sequel.
- #debug(data) ⇒ Object
-
#initialize(opts) ⇒ Postgres
constructor
A new instance of Postgres.
-
#method_missing(method, *args, &blk) ⇒ Object
behave as a normal postgres connection.
- #query(sql, params = [], &blk) ⇒ Object (also: #real_query, #execute)
Constructor Details
#initialize(opts) ⇒ Postgres
Returns a new instance of Postgres.
45 46 47 48 49 50 51 52 53 |
# File 'lib/em-postgres/postgres.rb', line 45 def initialize(opts) unless EM.respond_to?(:watch) and PGconn.method_defined?(:socket) raise RuntimeError, 'pg and EM.watch are required for EventedPostgres' end @settings = { :debug => false }.merge!(opts) @connection = connect(@settings) end |
Dynamic Method Handling
This class handles dynamic methods through the method_missing method
#method_missing(method, *args, &blk) ⇒ Object
behave as a normal postgres connection
70 71 72 |
# File 'lib/em-postgres/postgres.rb', line 70 def method_missing(method, *args, &blk) @connection.send(method, *args) end |
Instance Attribute Details
#connection ⇒ Object (readonly)
self::Postgres = ::Postgres unless defined? self::Postgres
43 44 45 |
# File 'lib/em-postgres/postgres.rb', line 43 def connection @connection end |
Instance Method Details
#close ⇒ Object
55 56 57 |
# File 'lib/em-postgres/postgres.rb', line 55 def close @connection.close end |
#connect(opts) ⇒ Object
74 75 76 77 78 79 80 81 82 83 84 85 |
# File 'lib/em-postgres/postgres.rb', line 74 def connect(opts) if conn = connect_socket(opts) #debug [:connect, conn.socket, opts] #EM.watch(conn.socket, EventMachine::PostgresConnection, conn, opts, self) EM.watch(conn.socket, EventMachine::PostgresConnection,conn,opts,self) else # invokes :errback callback in opts before firing again debug [:reconnect] EM.add_timer(5) { connect opts } end end |
#connect_socket(opts) ⇒ Object
stolen from sequel
88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 |
# File 'lib/em-postgres/postgres.rb', line 88 def connect_socket(opts) begin conn = PGconn.connect( opts[:host], (opts[:port]), #TODO deal with host and port nil,nil, opts[:database], opts[:user], opts[:password] ) # set encoding _before_ connecting if encoding = opts[:encoding] || opts[:charset] if conn.respond_to?(:set_client_encoding) conn.set_client_encoding(encoding) else conn.async_exec("set client_encoding to '#{encoding}'") end end #conn.options(Mysql::OPT_LOCAL_INFILE, 'client') # increase timeout so mysql server doesn't disconnect us # this is especially bad if we're disconnected while EM.attach is # still in progress, because by the time it gets to EM, the FD is # no longer valid, and it throws a c++ 'bad file descriptor' error # (do not use a timeout of -1 for unlimited, it does not work on mysqld > 5.0.60) #conn.query("set @@wait_timeout = #{opts[:timeout] || 2592000}") # we handle reconnecting (and reattaching the new fd to EM) #conn.reconnect = false # By default, MySQL 'where id is null' selects the last inserted id # Turn this off. http://dev.rubyonrails.org/ticket/6778 #conn.query("set SQL_AUTO_IS_NULL=0") # get results for queries #conn.query_with_result = true conn rescue Exception => e puts "#{e} exception" if cb = opts[:errback] cb.call(e) nil else raise e end end end |
#debug(data) ⇒ Object
138 139 140 |
# File 'lib/em-postgres/postgres.rb', line 138 def debug(data) p data if @settings[:debug] end |
#query(sql, params = [], &blk) ⇒ Object Also known as: real_query, execute
59 60 61 62 63 64 65 66 |
# File 'lib/em-postgres/postgres.rb', line 59 def query(sql,params=[], &blk) df = EventMachine::DefaultDeferrable.new cb = blk || Proc.new { |r| df.succeed(r) } eb = Proc.new { |r| df.fail(r) } @connection.execute(sql,params,cb,eb) df end |