em-pg-client
- Author
-
Rafał Michalski ([email protected])
DESCRIPTION
em-pg-client is the Ruby and EventMachine driver interface to the PostgreSQL RDBMS. It is based on ruby-pg.
FEATURES
-
Non-blocking / asynchronous processing with EventMachine,
-
fully async auto re-connects on connection losses (e.g.: RDBMS restarts),
-
minimal changes to PG::Connection API,
-
configurable timeouts (connect or execute) of asynchronous processing,
-
additional Fiber-aware version supporting EM-Synchrony.
BUGS/LIMITATIONS
-
no async support for: COPY commands (
get_copy_data
,put_copy_data
),wait_for_notify
andtransaction
-
actually no ActiveRecord nor Sequel support (you are welcome to contribute).
-
doesn’t work on Windows (issue #7)
API Changes between versions
0.1.x -> 0.2.x
-
on_reconnect
renamed to more accurateon_autoreconnect
(well, it’s not used by PG::EM::Client#reset call). -
async_autoreconnect
isfalse
by default ifon_autoreconnect
is not specified as initialization option.
TODO:
-
implement EM adapted version of
get_copy_data
,put_copy_data
,wait_for_notify
andtransaction
-
add some fd/socket hackery to get it working on Windows (issue #7)
-
em-synchrony ORM (ActiveRecord, Sequel and maybe Datamapper) support as separate projects
-
present more benchmarks
REQUIREMENTS
-
ruby >= 1.9 (tested: 1.9.3-p194, 1.9.2-p320, 1.9.1-p378)
-
bitbucket.org/ged/ruby-pg >= 0.13.2 (>= 0.14 recommended)
-
PostgreSQL RDBMS >= 8.3
-
rubyeventmachine.com >= 0.12.10
-
(optional) EM-Synchrony
INSTALL
$ [sudo] gem install em-pg-client
Gemfile
# eventmachine
gem "em-pg-client", "~> 0.2.1", :require => 'pg/em'
# em-synchrony
gem "em-pg-client", "~> 0.2.1", :require => ['pg/em', 'em-synchrony/pg']
Github
git clone git://github.com/royaltm/ruby-em-pg-client.git
WHY?
Because I didn’t find any ruby-pg’s EM implementation to fit my needs. I’ve found at least 3 other implementations of EM postgres client:
and (except the EM-bundled one which uses no longer maintained postgres-pr library) all of them have similiar flaws:
-
2 of them are designed to support some ORM (ActiveRecord or Sequel), so they are EM-Synchrony only,
-
non-standard API method names,
-
no (nonexistent or non-working) autoreconnect implementation,
-
poor error handling,
-
not fully supporting asynchronous PG::Connection API.
The last one is worth some comment:
They all use blocking methods to retrieve whole result from server (PGConn#block or PGConn#get_result which also blocks when there is not enough buffered data on socket).
This implementation makes use of non-blocking: PGConn#is_busy and PGConn#consume_input methods. Depending on the size of queries results and the level of concurrency, the gain in overall speed and responsiveness of your application might be actually quite huge. I’ve done some tests already.
Thanks
The greetz go to:
-
Authors of
pg
driver (especially for its async-api) -
Francis Cianfrocca for great reactor framework (EventMachine)
-
Ilya Grigorik (igrigorik) for (untangling) EM with Fibers
USAGE
em-pg-client
provides PG::EM::Client class which inherits PG::Connection. You can work with PG::EM::Client almost the same way you would with PG::Connection.
The real difference begins when you turn EventMachine reactor on.
BASIC
require 'pg/em'
# no async
pg = PG::EM::Client.new dbname: 'test'
pg.query('select * from foo') do |result|
puts Array(result).inspect
end
# asynchronous
EM.run do
df = pg.query('select * from foo')
df.callback { |result|
puts Array(result).inspect
EM.stop
}
df.errback {|ex|
raise ex
}
puts "sent"
end
# alternatively
EM.run do
pg.query('select * from foo') do |result|
raise result if result.is_a? ::Exception
puts Array(result).inspect
EM.stop
end
puts "sent"
end
PG::Connection methods adapted to EventMachine
The list of PG::EM::Client async methods for processing with EventMachine.
1. Async methods (always returning Deferrable
object):
-
Client.async_connect
(singleton) -
async_reset
-
async_exec
(alias:async_query
) -
async_prepare
-
async_exec_prepared
-
async_describe_prepared
-
async_describe_portal
For arguments of theese methods consult their original blocking (without async_
prefix) counterparts in PG::Connection manual.
Use callback
on the returned Deferrable
to receive result. The result you receive is PG::EM::Client for PG::EM::Client.async_connect and async_reset
, and PG::Result for the rest of the methods. The received PG::EM::Client is in a connected state and ready for queries. You need to clear
obtained PG::Result object yourself or leave it to gc
.
To detect a failure in an executed method use errback
on returned Deferrable
. You should expect an instance of Exception
(usually PG::Error) as errback
argument. You may check its backtrace
to find origin of the error.
2. Async / blocking methods (returning Deferrable
only when EM is running):
-
exec
(alias:query
) -
prepare
-
exec_prepared
-
describe_prepared
-
describe_portal
Outside EventMachine’s event loop these methods are regular, blocking PG::Connection methods.
All the methods (1 & 2) accept block argument which they attach to callback
and errback
hooks of returned Deferrable
.
You may also mix async and blocking methods without closing the connection. You only need to start/stop EventMachine in between async calls.
Special options
There are 3 additional connection options and 1 standard pg
option used by async methods. You may add them as one of the hash options to PG::EM::Client.new or PG::EM::Client.async_connect or simply use accessor methods to change them on the fly. The additional options are not passed to libpq
.
The options are:
-
async_autoreconnect
(true
/false
with defaultfalse
unlesson_autoreconnect
is specified) allows automatic re-connection when there was a problem with connection to the server, -
on_autoreconnect
(nil
/Proc
with defaultnil
) a hook which is called after auto-reconnecting, -
query_timeout
(Float
/Fixnum
with default0
) allows to set timeout for query execution, -
connect_timeout
(Float
/Fixnum
with default0
) connection establishing and resetting timeout.
Only connect_timeout
is a standard libpq
option, although changing it by accessor method only affects asynchronous functions.
AUTORECONNECTING IN ASYNC MODE
Autoreconnecting is done in non-blocking manner using async_reset
internally.
EM.run do
pg = PG::EM::Client.new dbname: 'test',
connect_timeout: 5, query_timeout: 50,
async_autoreconnect: true
try_query = lambda do |&blk|
pg.query('select * from foo') do |result|
raise result if result.is_a? ::Exception
puts Array(result).inspect
blk.call
end
end
try_query.call {
system 'pg_ctl stop -m fast'
system 'pg_ctl start -w'
try_query.call { EM.stop }
}
end
to enable this feature call:
pg.async_autoreconnect = true
or
pg = PG::EM::Client.new dbname: 'test',
async_autoreconnect: true
It’s also possible to define on_autoreconnect
callback to be invoked while the connection has been reset. It’s called just before the send query command is executed:
EM.run do
pg = PG::EM::Client.new dbname: 'test',
async_autoreconnect: true
pg.prepare('bar', 'select * from foo order by cdate desc') do
pg.on_autoreconnect = proc { |c, e|
c.prepare('bar', 'select * from foo order by cdate desc')
}
try_query = lambda do |&blk|
pg.exec_prepared('bar') do |result|
raise result if result.is_a? ::Exception
puts Array(result).inspect
blk.call
end
end
try_query.call {
system 'pg_ctl stop -m fast'
system 'pg_ctl start -w'
try_query.call { EM.stop }
}
end
end
As you can see it’s possible to send async query from inside on_autoreconnect
proc. However you have to pass Deferrable
from the async callback to the caller. See PG::EM::Client#on_autoreconnect docs for details.
TRUE ASYNC
For non-blocking connect use PG::EM::Client.async_connect and PG::EM::Client#async_reset for asynchronous re-connect. Like other async methods they return Deferrable
object. Use Deferrable’s #callback to obtain already connected PG::EM::Client.
EM.run do
pool = (1..10).map {
PG::EM::Client.async_connect dbname: 'test',
connect_timeout: 5, query_timeout: 50 }
togo = pool.length
pool.each_with_index do |df, i|
df.callback do |pg|
pg.query("select * from foo") do |result|
puts "recv: #{i}"
EM.stop if (togo-=1).zero?
end
puts "sent: #{i}"
end
df.errback { |ex| raise ex }
end
end
Fibers / EM-Synchrony
There is a special version of PG::EM::Client library with fiber aware methods for EM-Synchrony or other implementations of Fiber untangled EventMachine.
The require
string is “em-synchrony/pg” instead of “pg/em”.
em-synchrony/pg
version of PG::EM::Client.new is fully asynchronous and blocks only current fiber. This also applies to PG::EM::Client#reset.
require 'em-synchrony'
require 'em-synchrony/pg'
EM.synchrony do
pg = PG::EM::Client.new dbname: 'test'
pg.query('select * from foo') do |result|
puts Array(result).inspect
end
EM.stop
end
Although em-synchrony provides very nice set of tools for untangled EventMachine, you don’t really require it to fully benefit from this version of PG::EM::Client.
PG::Connection methods adapted to EM-Synchrony
The list of PG::EM::Client fiber aware methods for processing with EM-Synchrony / EventMachine.
All async_*
methods are exactly the same as in pure EventMachine version of PG::EM::Client.
The fiber aware methods are:
-
Client.connect
(singleton, alias:new
,open
,setdb
,setdblogin
) -
reset
-
exec
(alias:query
) -
prepare
-
exec_prepared
-
describe_prepared
-
describe_portal
Under the hood, these methods call async counterparts of themselves and yield
from current fiber awaiting for the result. The PG::Result (or PG::EM::Client for connect
and reset
) is then returned to the caller. If code block was given, it is executed with result as the argument. In that case the value of the block is returned instead and PG::Result is cleared (or in case of connect
or reset
PG::EM::Client is being closed) after executing block. From single fiber point of view, they behave like regular blocking PG::Connection methods.
Each of them is also automatic, detecting if EventMachine is running. If called outside EM event loop they are exactly the original methods of PG::Connection.
Like in pure EventMachine version you can mix async, fiber aware and blocking methods without finishing the connection. You only need to start/stop EventMachine in between async calls.
Handling errors
EM.synchrony do
begin
pg.query('select * from foo') do |result|
puts result
end
rescue PG::Error => e
puts "PSQL error: #{e.inspect}"
end
EM.stop
end
Parallel async queries
EM.synchrony do
pg = EM::Synchrony::ConnectionPool.new(size: 2) do
PG::EM::Client.new :dbname => 'test'
end
multi = EventMachine::Synchrony::Multi.new
multi.add :foo, pg.aquery('select * from foo') # or #async_query()
multi.add :bar, pg.aquery('select * from bar') # #aquery() is just an alias
res = multi.perform
p res
EM.stop
end
Fiber Concurrency
EM.synchrony do
# use ConnectionPool when more Fibers will be querying at the same time!
pg = EM::Synchrony::ConnectionPool.new(size: 5) do
PG::EM::Client.new :dbname => 'test'
end
counter = 0
EM::Synchrony::FiberIterator.new(['select * from foo']*10, 5) do |query|
i = counter
pg.query(query) do |result|
puts "recv: #{i}"
end
puts "sent: #{i}"
counter += 1
end
EM.stop
end
Async reconnect with on_autoreconnect callback
EM.synchrony do
on_autoreconnect = proc do |c, e|
c.prepare('bar', 'select * from foo order by cdate desc')
end
pg = EM::Synchrony::ConnectionPool.new(size: 5) do
p = PG::EM::Client.new dbname: 'test', on_autoreconnect: on_autoreconnect
on_autoreconnect.call p
p
end
try_query = lambda do
pg.exec_prepared('bar') do |result|
raise result if result.is_a? ::Exception
puts Array(result).inspect
end
end
try_query.call
system 'pg_ctl stop -m fast'
system 'pg_ctl start -w'
try_query.call
EM.stop
end
Specifying on_autoreconnect
as PG::EM::Client.new initialization option, implicitly enables async_autoreconnect
.
LICENCE
The MIT License - Copyright © 2012 Rafał Michalski