Class: PG::EM::Client
- Inherits:
-
Connection
- Object
- Connection
- PG::EM::Client
- Defined in:
- lib/pg/em.rb,
lib/pg/em/client/watcher.rb,
lib/pg/em/client/connect_watcher.rb
Overview
PostgreSQL EventMachine client
- Author
-
Rafal Michalski
Client is a PG::Connection wrapper designed for EventMachine.
The following new methods:
-
#exec_defer (alias:
query_defer
)
are added to execute queries asynchronously, returning Deferrable
object.
The following methods of PG::Connection are overloaded:
-
#exec (alias:
query
,async_exec
,async_query
)
and are now auto-detecting if EventMachine is running and performing commands asynchronously (blocking only current fiber) or calling parent thread-blocking methods.
If #async_autoreconnect option is set to true
, all of the above methods (in asynchronous mode) try to re-connect after a connection error occurs. It’s performed behind the scenes, so no error is raised, except if there was a transaction in progress. In such instance the error is raised after establishing connection to signal that the transaction was aborted.
If you want to detect auto re-connect event use #on_autoreconnect property/option.
To enable auto-reconnecting set:
client.async_autoreconnect = true
or pass as Client.new hash argument:
PG::EM::Client.new dbname: 'bar', async_autoreconnect: true
There are also new methods:
which are asynchronous versions of PG::Connection.new and PG:Connection#reset.
Additionally the following methods are overloaded:
-
Client.new (alias:
connect
,open
,setdb
,setdblogin
)
providing auto-detecting asynchronous (fiber-synchronized) or thread-blocking methods for (re)connecting.
Otherwise nothing changes in PG::Connection API. See PG::Connection docs for explanation of arguments to the above methods.
Warning:
#describe_prepared and #exec_prepared after #prepare should only be invoked on the same connection. If you are using a ConnectionPool, make sure to acquire a single connection first or execute prepare
command on every connection using #on_connect
hook.
Defined Under Namespace
Modules: ConnectWatcher, Watcher
Constant Summary collapse
- DEFAULT_ASYNC_VARS =
{ :@async_autoreconnect => nil, :@connect_timeout => nil, :@query_timeout => 0, :@on_connect => nil, :@on_autoreconnect => nil, :@async_command_aborted => false, }.freeze
- TRAN_BEGIN_QUERY =
'BEGIN'
- TRAN_ROLLBACK_QUERY =
'ROLLBACK'
- TRAN_COMMIT_QUERY =
'COMMIT'
- @@connect_timeout_envvar =
environment variable name for connect_timeout fallback value
conndefaults.find{|d| d[:keyword] == "connect_timeout" }[:envvar]
Instance Attribute Summary collapse
-
#async_autoreconnect ⇒ Boolean
Enable/disable auto re-connect feature (
true
/false
). -
#connect_timeout ⇒ Float
Connection timeout.
-
#on_autoreconnect {|pg, error| ... } ⇒ Proc<Client, Error>
A proc like object that is being called after a connection with the server has been automatically re-established.
-
#on_connect {|pg, is_async, is_reset| ... } ⇒ Proc<Client,is_async,is_reset>
A proc like object that is being called after a connection with the server has been established.
-
#query_timeout ⇒ Float
Aborts async command processing if server response time exceedes
query_timeout
seconds.
Deferrable connection methods collapse
-
.connect_defer(*args) {|pg| ... } ⇒ FeaturedDeferrable
(also: async_connect)
Attempts to establish the connection asynchronously.
-
#reset_defer {|pg| ... } ⇒ FeaturedDeferrable
(also: #async_reset)
Attempts to reset the connection asynchronously.
Auto-sensing fiber-synchronized connection methods collapse
-
.new(*args, &blk) ⇒ Object
(also: connect, open, setdb, setdblogin)
Creates new instance of PG::EM::Client and attempts to establish connection.
-
#reset ⇒ Object
Attempts to reset the connection.
Deferrable command methods collapse
-
#describe_portal_defer(portal_name) {|result| ... } ⇒ FeaturedDeferrable
Asynchronously sends command to retrieve information about the portal
portal_name
, and immediately returns with a Deferrable. -
#describe_prepared_defer(statement_name) {|result| ... } ⇒ FeaturedDeferrable
Asynchronously sends command to retrieve information about the prepared statement
statement_name
, and immediately returns with a Deferrable. -
#exec_defer(sql, params = nil, result_format = nil) {|result| ... } ⇒ FeaturedDeferrable
(also: #query_defer, #async_query_defer, #async_exec_defer, #exec_params_defer)
Sends SQL query request specified by
sql
to PostgreSQL for asynchronous processing, and immediately returns withdeferrable
. -
#exec_prepared_defer(statement_name, params = nil, result_format = nil) {|result| ... } ⇒ FeaturedDeferrable
Execute prepared named statement specified by
statement_name
asynchronously, and immediately returns with a Deferrable. -
#get_last_result_defer {|result| ... } ⇒ FeaturedDeferrable
Asynchronously retrieves all available results on the current connection (from previously issued asynchronous commands like send_query()) and immediately returns with a Deferrable.
-
#get_result_defer {|result| ... } ⇒ FeaturedDeferrable
Asynchronously retrieves the next result from a call to #send_query (or another asynchronous command) and immediately returns with a Deferrable.
-
#prepare_defer(stmt_name, sql, param_types = nil) {|result| ... } ⇒ FeaturedDeferrable
Prepares statement
sql
with namestmt_name
to be executed later asynchronously, and immediately returns with a Deferrable. -
#wait_for_notify_defer(timeout = nil) {|notification| ... } ⇒ FeaturedDeferrable
(also: #notifies_wait_defer)
Asynchronously waits for notification or until the optional
timeout
is reached, whichever comes first.
Auto-sensing fiber-synchronized command methods collapse
-
#describe_portal(portal_name) {|result| ... } ⇒ PG::Result, Object
Retrieves information about the portal
portal_name
,. -
#describe_prepared(statement_name) {|result| ... } ⇒ PG::Result, Object
Retrieves information about the prepared statement
statement_name
,. -
#exec(sql) {|result| ... } ⇒ PG::Result, Object
(also: #query, #async_query, #async_exec)
Sends SQL query request specified by
sql
to PostgreSQL. -
#exec_params(sql, params = nil, result_format = nil) {|result| ... } ⇒ PG::Result, Object
Sends SQL query request specified by
sql
with optionalparams
andresult_format
to PostgreSQL. -
#exec_prepared(statement_name, params = nil, result_format = nil) {|result| ... } ⇒ PG::Result, Object
Executes prepared named statement specified by
statement_name
. -
#get_last_result {|result| ... } ⇒ PG::Result, ...
Retrieves all available results on the current connection (from previously issued asynchronous commands like send_query()) and returns the last non-NULL result, or
nil
if no results are available. -
#get_result {|result| ... } ⇒ PG::Result, ...
Retrieves the next result from a call to #send_query (or another asynchronous command).
-
#prepare(stmt_name, sql, param_types = nil) {|result| ... } ⇒ PG::Result, Object
Prepares statement
sql
with namestmt_name
to be executed later. -
#wait_for_notify(timeout = nil) {|name, pid, payload| ... } ⇒ String|nil
(also: #notifies_wait)
Blocks while waiting for notification(s), or until the optional
timeout
is reached, whichever comes first.
Class Method Summary collapse
-
.single_row_mode? ⇒ Boolean
Returns
true
ifpg
supports single row mode orfalse
otherwise.
Instance Method Summary collapse
- #blocking_get_result ⇒ Object
- #blocking_wait_for_notify ⇒ Object
-
#finish ⇒ Object
(also: #close)
Closes the backend connection.
- #raise_error(klass = Error, message = error_message) ⇒ Object
-
#single_row_mode? ⇒ Boolean
Returns
true
ifpg
supports single row mode orfalse
otherwise. -
#status ⇒ Number
Returns status of connection: PG::CONNECTION_OK or PG::CONNECTION_BAD.
-
#transaction {|client| ... } ⇒ Object
Executes a BEGIN at the start of the block and a COMMIT at the end of the block or ROLLBACK if any exception occurs.
Instance Attribute Details
#async_autoreconnect ⇒ Boolean
Enable/disable auto re-connect feature (true
/false
). Defaults to false
unless #on_autoreconnect is specified in a connection_hash
.
Changing #on_autoreconnect with accessor method doesn’t change the state of #async_autoreconnect.
You may set :async_autoreconnect
in a connection_hash
argument passed to new or connect_defer.
142 143 144 |
# File 'lib/pg/em.rb', line 142 def async_autoreconnect @async_autoreconnect end |
#connect_timeout ⇒ Float
Connection timeout. Affects #reset and #reset_defer.
Changing this property does not affect thread-blocking #reset unless passed as a connection_hash
.
To enable it set to some positive value. To disable it: set to 0.
You may set :connect_timeout
in a connection_hash
argument passed to new or connect_defer.
117 118 119 |
# File 'lib/pg/em.rb', line 117 def connect_timeout @connect_timeout end |
#on_autoreconnect {|pg, error| ... } ⇒ Proc<Client, Error>
A proc like object that is being called after a connection with the server has been automatically re-established. It’s being invoked just before the pending command is sent to the server.
The first argument it receives is the connected PG::EM::Client instance. The second is the original error
that caused the reconnecting process.
It’s possible to execute queries from the on_autoreconnect
hook. Code is being executed in a fiber context, so both deferrable and fiber-synchronized query commands may be used.
If exception is raised during execution of the on_autoreconnect
hook the reset operation will fail with that exception.
The hook can control later actions with its return value:
-
false
(explicitly,nil
is ignored) - the originalexception
is raised/passed back and the pending query command is not sent again to the server. -
true
(explicitly, truish values are ignored), the pending command is called regardless of the connection’s last transaction status. -
Exception
object - is raised/passed back and the pending command is not sent. -
Deferrable
object - the chosen action will depend on the returned deferrable status. -
Other values are ignored and the pending query command is immediately sent to the server unless there was a transaction in progress before the connection was reset.
If both on_connect
and on_autoreconnect
hooks are set, the on_connect
is being called first and on_autoreconnect
is called only when on_connect
succeeds.
You may set :on_autoreconnect
hook in a connection_hash
argument passed to new or connect_defer.
196 |
# File 'lib/pg/em.rb', line 196 attr_writer :on_autoreconnect |
#on_connect {|pg, is_async, is_reset| ... } ⇒ Proc<Client,is_async,is_reset>
A proc like object that is being called after a connection with the server has been established.
The first argument it receives is the connected PG::EM::Client instance. The second argument is true
if the connection was established in asynchronous manner, false
otherwise. The third argument is true
when the connection has been reset or false
on new connection.
It’s possible to execute queries from the on_connect
hook. Code is being executed in a fiber context, so both deferrable and fiber-synchronized query commands may be used. However deferrable commands will work only if eventmachine reactor is running, so check if is_async
is true
.
If exception is raised during execution of the on_connect
hook the connecting/reset operation will fail with that exception.
The hook can control later actions with its return value:
-
Deferrable
object - the connection establishing status will depend on the returned deferrable status (only in asynchronous mode). -
Other values are ignored.
If both on_connect
and on_autoreconnect
hooks are set, the on_connect
is being called first and on_autoreconnect
is called only when on_connect
succeeds.
You may set :on_connect
hook in a connection_hash
argument passed to new or connect_defer.
252 |
# File 'lib/pg/em.rb', line 252 attr_writer :on_connect |
#query_timeout ⇒ Float
Aborts async command processing if server response time exceedes query_timeout
seconds. This does not apply to #reset and #reset_defer.
To enable it set to some positive value. To disable it: set to 0.
You may set :query_timeout
in a connection_hash
argument passed to new or connect_defer.
129 130 131 |
# File 'lib/pg/em.rb', line 129 def query_timeout @query_timeout end |
Class Method Details
.connect_defer(*args) {|pg| ... } ⇒ FeaturedDeferrable Also known as: async_connect
Attempts to establish the connection asynchronously.
Pass the block to the returned deferrable’s callback
to obtain newly created and already connected PG::EM::Client object. In case of connection error errback
hook receives an error object as an argument. If the block is provided it’s bound to both callback
and errback
hooks of the returned deferrable.
Special PG::EM::Client options (e.g.: #async_autoreconnect) must be provided in a connection_hash
argument variant. They will be ignored if passed in a connection_string
.
client_encoding
will be set according to Encoding.default_internal
.
354 355 356 357 358 359 360 361 362 363 364 |
# File 'lib/pg/em.rb', line 354 def self.connect_defer(*args, &blk) df = FeaturedDeferrable.new(&blk) async_args = (args) conn = df.protect { connect_start(*args) } if conn async_args.each {|k, v| conn.instance_variable_set(k, v) } ::EM.watch(conn.socket_io, ConnectWatcher, conn, df, false). poll_connection_and_check end df end |
.new(*args, &blk) ⇒ Object Also known as: connect, open, setdb, setdblogin
Creates new instance of PG::EM::Client and attempts to establish connection.
Performs command asynchronously yielding from current fiber if EventMachine reactor is running and current fiber isn’t the root fiber. Other fibers can process while waiting for the server to complete the request.
Otherwise performs a thread-blocking call to the parent method.
Special PG::EM::Client options (e.g.: #async_autoreconnect) must be provided in a connection_hash
argument variant. They will be ignored if passed in a connection_string
.
client_encoding
will be set according to Encoding.default_internal
.
456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 |
# File 'lib/pg/em.rb', line 456 def self.new(*args, &blk) if ::EM.reactor_running? && !(f = Fiber.current).equal?(ROOT_FIBER) connect_defer(*args) {|r| f.resume(r) } conn = Fiber.yield raise conn if conn.is_a?(::Exception) if block_given? begin yield conn ensure conn.finish end else conn end else conn = super(*args) if on_connect = conn.on_connect on_connect.call(conn, false, false) end conn end end |
.single_row_mode? ⇒ Boolean
Returns true
if pg
supports single row mode or false
otherwise. Single row mode is available since libpq
9.2.
270 271 272 |
# File 'lib/pg/em.rb', line 270 def self.single_row_mode? method_defined? :set_single_row_mode end |
Instance Method Details
#blocking_get_result ⇒ Object
795 |
# File 'lib/pg/em.rb', line 795 alias_method :blocking_get_result, :get_result |
#blocking_wait_for_notify ⇒ Object
794 |
# File 'lib/pg/em.rb', line 794 alias_method :blocking_wait_for_notify, :wait_for_notify |
#describe_portal(portal_name) {|result| ... } ⇒ PG::Result, Object
Retrieves information about the portal portal_name
,
If EventMachine reactor is running and the current fiber isn’t the root fiber this method performs command asynchronously yielding current fiber. Other fibers can process while waiting for the server to complete the request.
Otherwise performs a blocking call to a parent method.
907 908 909 910 911 912 913 914 915 916 917 918 919 920 921 922 923 924 925 926 927 928 929 930 931 932 933 934 935 936 937 |
# File 'lib/pg/em.rb', line 907 %w( exec exec_defer exec_params exec_defer exec_prepared exec_prepared_defer prepare prepare_defer describe_prepared describe_prepared_defer describe_portal describe_portal_defer get_last_result get_last_result_defer ).each_slice(2) do |name, defer_name| class_eval <<-EOD, __FILE__, __LINE__ def #{name}(*args, &blk) if ::EM.reactor_running? && !(f = Fiber.current).equal?(ROOT_FIBER) if (result = fiber_sync #{defer_name}(*args), f).is_a?(::Exception) raise result end if block_given? && result begin yield result ensure result.clear end else result end else super end end EOD end |
#describe_portal_defer(portal_name) {|result| ... } ⇒ FeaturedDeferrable
Asynchronously sends command to retrieve information about the portal portal_name
, and immediately returns with a Deferrable.
Use the returned Deferrable’s callback
and errback
methods to get the result. If the block is provided it’s bound to both the callback
and errback
hooks of the returned deferrable.
653 654 655 656 657 658 659 660 661 662 663 664 665 666 667 668 669 670 671 672 673 674 675 676 677 678 679 680 681 682 |
# File 'lib/pg/em.rb', line 653 %w( exec_defer send_query prepare_defer send_prepare exec_prepared_defer send_query_prepared describe_prepared_defer send_describe_prepared describe_portal_defer send_describe_portal ).each_slice(2) do |defer_name, send_name| class_eval <<-EOD, __FILE__, __LINE__ def #{defer_name}(*args, &blk) df = FeaturedDeferrable.new send_proc = proc do #{send_name}(*args) setup_emio_watcher.watch_results(df, send_proc) end begin check_async_command_aborted! @last_transaction_status = transaction_status send_proc.call rescue Error => e ::EM.next_tick { async_autoreconnect!(df, e, send_proc) } rescue Exception => e ::EM.next_tick { df.fail(e) } end df.completion(&blk) if block_given? df end EOD end |
#describe_prepared(statement_name) {|result| ... } ⇒ PG::Result, Object
Retrieves information about the prepared statement statement_name
,
If EventMachine reactor is running and the current fiber isn’t the root fiber this method performs command asynchronously yielding current fiber. Other fibers can process while waiting for the server to complete the request.
Otherwise performs a blocking call to a parent method.
907 908 909 910 911 912 913 914 915 916 917 918 919 920 921 922 923 924 925 926 927 928 929 930 931 932 933 934 935 936 937 |
# File 'lib/pg/em.rb', line 907 %w( exec exec_defer exec_params exec_defer exec_prepared exec_prepared_defer prepare prepare_defer describe_prepared describe_prepared_defer describe_portal describe_portal_defer get_last_result get_last_result_defer ).each_slice(2) do |name, defer_name| class_eval <<-EOD, __FILE__, __LINE__ def #{name}(*args, &blk) if ::EM.reactor_running? && !(f = Fiber.current).equal?(ROOT_FIBER) if (result = fiber_sync #{defer_name}(*args), f).is_a?(::Exception) raise result end if block_given? && result begin yield result ensure result.clear end else result end else super end end EOD end |
#describe_prepared_defer(statement_name) {|result| ... } ⇒ FeaturedDeferrable
Asynchronously sends command to retrieve information about the prepared statement statement_name
, and immediately returns with a Deferrable.
Use the returned Deferrable’s callback
and errback
methods to get the result. If the block is provided it’s bound to both the callback
and errback
hooks of the returned deferrable.
653 654 655 656 657 658 659 660 661 662 663 664 665 666 667 668 669 670 671 672 673 674 675 676 677 678 679 680 681 682 |
# File 'lib/pg/em.rb', line 653 %w( exec_defer send_query prepare_defer send_prepare exec_prepared_defer send_query_prepared describe_prepared_defer send_describe_prepared describe_portal_defer send_describe_portal ).each_slice(2) do |defer_name, send_name| class_eval <<-EOD, __FILE__, __LINE__ def #{defer_name}(*args, &blk) df = FeaturedDeferrable.new send_proc = proc do #{send_name}(*args) setup_emio_watcher.watch_results(df, send_proc) end begin check_async_command_aborted! @last_transaction_status = transaction_status send_proc.call rescue Error => e ::EM.next_tick { async_autoreconnect!(df, e, send_proc) } rescue Exception => e ::EM.next_tick { df.fail(e) } end df.completion(&blk) if block_given? df end EOD end |
#exec(sql) {|result| ... } ⇒ PG::Result, Object Also known as: query, async_query, async_exec
Sends SQL query request specified by sql
to PostgreSQL.
If EventMachine reactor is running and the current fiber isn’t the root fiber this method performs command asynchronously yielding current fiber. Other fibers can process while waiting for the server to complete the request.
Otherwise performs a blocking call to a parent method.
907 908 909 910 911 912 913 914 915 916 917 918 919 920 921 922 923 924 925 926 927 928 929 930 931 932 933 934 935 936 937 |
# File 'lib/pg/em.rb', line 907 %w( exec exec_defer exec_params exec_defer exec_prepared exec_prepared_defer prepare prepare_defer describe_prepared describe_prepared_defer describe_portal describe_portal_defer get_last_result get_last_result_defer ).each_slice(2) do |name, defer_name| class_eval <<-EOD, __FILE__, __LINE__ def #{name}(*args, &blk) if ::EM.reactor_running? && !(f = Fiber.current).equal?(ROOT_FIBER) if (result = fiber_sync #{defer_name}(*args), f).is_a?(::Exception) raise result end if block_given? && result begin yield result ensure result.clear end else result end else super end end EOD end |
#exec_defer(sql, params = nil, result_format = nil) {|result| ... } ⇒ FeaturedDeferrable Also known as: query_defer, async_query_defer, async_exec_defer, exec_params_defer
Sends SQL query request specified by sql
to PostgreSQL for asynchronous processing, and immediately returns with deferrable
.
Use the returned Deferrable’s callback
and errback
methods to get the result. If the block is provided it’s bound to both the callback
and errback
hooks of the returned deferrable.
653 654 655 656 657 658 659 660 661 662 663 664 665 666 667 668 669 670 671 672 673 674 675 676 677 678 679 680 681 682 |
# File 'lib/pg/em.rb', line 653 %w( exec_defer send_query prepare_defer send_prepare exec_prepared_defer send_query_prepared describe_prepared_defer send_describe_prepared describe_portal_defer send_describe_portal ).each_slice(2) do |defer_name, send_name| class_eval <<-EOD, __FILE__, __LINE__ def #{defer_name}(*args, &blk) df = FeaturedDeferrable.new send_proc = proc do #{send_name}(*args) setup_emio_watcher.watch_results(df, send_proc) end begin check_async_command_aborted! @last_transaction_status = transaction_status send_proc.call rescue Error => e ::EM.next_tick { async_autoreconnect!(df, e, send_proc) } rescue Exception => e ::EM.next_tick { df.fail(e) } end df.completion(&blk) if block_given? df end EOD end |
#exec_params(sql, params = nil, result_format = nil) {|result| ... } ⇒ PG::Result, Object
Sends SQL query request specified by sql
with optional params
and result_format
to PostgreSQL.
If EventMachine reactor is running and the current fiber isn’t the root fiber this method performs command asynchronously yielding current fiber. Other fibers can process while waiting for the server to complete the request.
Otherwise performs a blocking call to a parent method.
907 908 909 910 911 912 913 914 915 916 917 918 919 920 921 922 923 924 925 926 927 928 929 930 931 932 933 934 935 936 937 |
# File 'lib/pg/em.rb', line 907 %w( exec exec_defer exec_params exec_defer exec_prepared exec_prepared_defer prepare prepare_defer describe_prepared describe_prepared_defer describe_portal describe_portal_defer get_last_result get_last_result_defer ).each_slice(2) do |name, defer_name| class_eval <<-EOD, __FILE__, __LINE__ def #{name}(*args, &blk) if ::EM.reactor_running? && !(f = Fiber.current).equal?(ROOT_FIBER) if (result = fiber_sync #{defer_name}(*args), f).is_a?(::Exception) raise result end if block_given? && result begin yield result ensure result.clear end else result end else super end end EOD end |
#exec_prepared(statement_name, params = nil, result_format = nil) {|result| ... } ⇒ PG::Result, Object
Executes prepared named statement specified by statement_name
.
If EventMachine reactor is running and the current fiber isn’t the root fiber this method performs command asynchronously yielding current fiber. Other fibers can process while waiting for the server to complete the request.
Otherwise performs a blocking call to a parent method.
907 908 909 910 911 912 913 914 915 916 917 918 919 920 921 922 923 924 925 926 927 928 929 930 931 932 933 934 935 936 937 |
# File 'lib/pg/em.rb', line 907 %w( exec exec_defer exec_params exec_defer exec_prepared exec_prepared_defer prepare prepare_defer describe_prepared describe_prepared_defer describe_portal describe_portal_defer get_last_result get_last_result_defer ).each_slice(2) do |name, defer_name| class_eval <<-EOD, __FILE__, __LINE__ def #{name}(*args, &blk) if ::EM.reactor_running? && !(f = Fiber.current).equal?(ROOT_FIBER) if (result = fiber_sync #{defer_name}(*args), f).is_a?(::Exception) raise result end if block_given? && result begin yield result ensure result.clear end else result end else super end end EOD end |
#exec_prepared_defer(statement_name, params = nil, result_format = nil) {|result| ... } ⇒ FeaturedDeferrable
Execute prepared named statement specified by statement_name
asynchronously, and immediately returns with a Deferrable.
Use the returned Deferrable’s callback
and errback
methods to get the result. If the block is provided it’s bound to both the callback
and errback
hooks of the returned deferrable.
653 654 655 656 657 658 659 660 661 662 663 664 665 666 667 668 669 670 671 672 673 674 675 676 677 678 679 680 681 682 |
# File 'lib/pg/em.rb', line 653 %w( exec_defer send_query prepare_defer send_prepare exec_prepared_defer send_query_prepared describe_prepared_defer send_describe_prepared describe_portal_defer send_describe_portal ).each_slice(2) do |defer_name, send_name| class_eval <<-EOD, __FILE__, __LINE__ def #{defer_name}(*args, &blk) df = FeaturedDeferrable.new send_proc = proc do #{send_name}(*args) setup_emio_watcher.watch_results(df, send_proc) end begin check_async_command_aborted! @last_transaction_status = transaction_status send_proc.call rescue Error => e ::EM.next_tick { async_autoreconnect!(df, e, send_proc) } rescue Exception => e ::EM.next_tick { df.fail(e) } end df.completion(&blk) if block_given? df end EOD end |
#finish ⇒ Object Also known as: close
Closes the backend connection.
Detaches watch handler to prevent memory leak after calling parent PG::Connection#finish.
500 501 502 503 504 505 506 |
# File 'lib/pg/em.rb', line 500 def finish super if @watcher @watcher.detach if @watcher.watching? @watcher = nil end end |
#get_last_result {|result| ... } ⇒ PG::Result, ...
Retrieves all available results on the current connection (from previously issued asynchronous commands like send_query()) and returns the last non-NULL result, or nil
if no results are available.
If EventMachine reactor is running and the current fiber isn’t the root fiber this method performs command asynchronously yielding current fiber. Other fibers can process while waiting for the server to complete the request.
Otherwise performs a blocking call to a parent method.
907 908 909 910 911 912 913 914 915 916 917 918 919 920 921 922 923 924 925 926 927 928 929 930 931 932 933 934 935 936 937 |
# File 'lib/pg/em.rb', line 907 %w( exec exec_defer exec_params exec_defer exec_prepared exec_prepared_defer prepare prepare_defer describe_prepared describe_prepared_defer describe_portal describe_portal_defer get_last_result get_last_result_defer ).each_slice(2) do |name, defer_name| class_eval <<-EOD, __FILE__, __LINE__ def #{name}(*args, &blk) if ::EM.reactor_running? && !(f = Fiber.current).equal?(ROOT_FIBER) if (result = fiber_sync #{defer_name}(*args), f).is_a?(::Exception) raise result end if block_given? && result begin yield result ensure result.clear end else result end else super end end EOD end |
#get_last_result_defer {|result| ... } ⇒ FeaturedDeferrable
Asynchronously retrieves all available results on the current connection (from previously issued asynchronous commands like send_query()) and immediately returns with a Deferrable. It then receives the last non-NULL result on :succeed, or nil
if no results are available.
Use the returned Deferrable’s callback
and errback
methods to get the result. If the block is provided it’s bound to both the callback
and errback
hooks of the returned deferrable.
775 776 777 778 779 780 781 782 783 784 785 786 787 788 789 790 |
# File 'lib/pg/em.rb', line 775 def get_last_result_defer(&blk) df = FeaturedDeferrable.new begin if status == CONNECTION_OK setup_emio_watcher.watch_results(df) else df.succeed end rescue Error => e ::EM.next_tick { async_autoreconnect!(df, e) } rescue Exception => e ::EM.next_tick { df.fail(e) } end df.completion(&blk) if block_given? df end |
#get_result {|result| ... } ⇒ PG::Result, ...
Retrieves the next result from a call to #send_query (or another asynchronous command). If no more results are available returns nil
and the block (if given) is never called.
If EventMachine reactor is running and the current fiber isn’t the root fiber this method performs command asynchronously yielding current fiber. Other fibers can process while waiting for the server to complete the request.
Otherwise performs a blocking call to a parent method.
990 991 992 993 994 995 996 997 998 999 1000 1001 1002 1003 1004 1005 1006 1007 |
# File 'lib/pg/em.rb', line 990 def get_result if is_busy && ::EM.reactor_running? && !(f = Fiber.current).equal?(ROOT_FIBER) if (result = fiber_sync get_result_defer, f).is_a?(::Exception) raise result end if block_given? && result begin yield result ensure result.clear end else result end else super end end |
#get_result_defer {|result| ... } ⇒ FeaturedDeferrable
Asynchronously retrieves the next result from a call to #send_query (or another asynchronous command) and immediately returns with a Deferrable. It then receives the result object on :succeed, or nil
if no results are available.
Use the returned Deferrable’s callback
and errback
methods to get the result. If the block is provided it’s bound to both the callback
and errback
hooks of the returned deferrable.
741 742 743 744 745 746 747 748 749 750 751 752 753 754 755 756 757 758 759 760 |
# File 'lib/pg/em.rb', line 741 def get_result_defer(&blk) df = FeaturedDeferrable.new begin if status == CONNECTION_OK if is_busy setup_emio_watcher.watch_results(df, nil, true) else df.succeed blocking_get_result end else df.succeed end rescue Error => e ::EM.next_tick { async_autoreconnect!(df, e) } rescue Exception => e ::EM.next_tick { df.fail(e) } end df.completion(&blk) if block_given? df end |
#prepare(stmt_name, sql, param_types = nil) {|result| ... } ⇒ PG::Result, Object
Prepares statement sql
with name stmt_name
to be executed later.
If EventMachine reactor is running and the current fiber isn’t the root fiber this method performs command asynchronously yielding current fiber. Other fibers can process while waiting for the server to complete the request.
Otherwise performs a blocking call to a parent method.
907 908 909 910 911 912 913 914 915 916 917 918 919 920 921 922 923 924 925 926 927 928 929 930 931 932 933 934 935 936 937 |
# File 'lib/pg/em.rb', line 907 %w( exec exec_defer exec_params exec_defer exec_prepared exec_prepared_defer prepare prepare_defer describe_prepared describe_prepared_defer describe_portal describe_portal_defer get_last_result get_last_result_defer ).each_slice(2) do |name, defer_name| class_eval <<-EOD, __FILE__, __LINE__ def #{name}(*args, &blk) if ::EM.reactor_running? && !(f = Fiber.current).equal?(ROOT_FIBER) if (result = fiber_sync #{defer_name}(*args), f).is_a?(::Exception) raise result end if block_given? && result begin yield result ensure result.clear end else result end else super end end EOD end |
#prepare_defer(stmt_name, sql, param_types = nil) {|result| ... } ⇒ FeaturedDeferrable
Prepares statement sql
with name stmt_name
to be executed later asynchronously, and immediately returns with a Deferrable.
Use the returned Deferrable’s callback
and errback
methods to get the result. If the block is provided it’s bound to both the callback
and errback
hooks of the returned deferrable.
653 654 655 656 657 658 659 660 661 662 663 664 665 666 667 668 669 670 671 672 673 674 675 676 677 678 679 680 681 682 |
# File 'lib/pg/em.rb', line 653 %w( exec_defer send_query prepare_defer send_prepare exec_prepared_defer send_query_prepared describe_prepared_defer send_describe_prepared describe_portal_defer send_describe_portal ).each_slice(2) do |defer_name, send_name| class_eval <<-EOD, __FILE__, __LINE__ def #{defer_name}(*args, &blk) df = FeaturedDeferrable.new send_proc = proc do #{send_name}(*args) setup_emio_watcher.watch_results(df, send_proc) end begin check_async_command_aborted! @last_transaction_status = transaction_status send_proc.call rescue Error => e ::EM.next_tick { async_autoreconnect!(df, e, send_proc) } rescue Exception => e ::EM.next_tick { df.fail(e) } end df.completion(&blk) if block_given? df end EOD end |
#raise_error(klass = Error, message = error_message) ⇒ Object
797 798 799 800 801 |
# File 'lib/pg/em.rb', line 797 def raise_error(klass=Error, =) error = klass.new() error.instance_variable_set(:@connection, self) raise error end |
#reset ⇒ Object
Attempts to reset the connection.
Performs command asynchronously yielding from current fiber if EventMachine reactor is running and current fiber isn’t the root fiber. Other fibers can process while waiting for the server to complete the request.
Otherwise performs a thread-blocking call to the parent method.
418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 |
# File 'lib/pg/em.rb', line 418 def reset if ::EM.reactor_running? && !(f = Fiber.current).equal?(ROOT_FIBER) reset_defer {|r| f.resume(r) } conn = Fiber.yield raise conn if conn.is_a?(::Exception) conn else @async_command_aborted = false if @watcher @watcher.detach if @watcher.watching? @watcher = nil end super @on_connect.call(self, false, true) if @on_connect self end end |
#reset_defer {|pg| ... } ⇒ FeaturedDeferrable Also known as: async_reset
Attempts to reset the connection asynchronously.
Pass the block to the returned deferrable’s callback
to execute after successfull reset. If the block is provided it’s bound to callback
and errback
hooks of the returned deferrable.
382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 |
# File 'lib/pg/em.rb', line 382 def reset_defer(&blk) @async_command_aborted = false df = FeaturedDeferrable.new(&blk) # there can be only one watch handler over the socket # apparently eventmachine has hard time dealing with more than one if @watcher @watcher.detach if @watcher.watching? @watcher = nil end ret = df.protect(:fail) { reset_start } unless ret == :fail ::EM.watch(self.socket_io, ConnectWatcher, self, df, true). poll_connection_and_check end df end |
#single_row_mode? ⇒ Boolean
Returns true
if pg
supports single row mode or false
otherwise.
277 278 279 |
# File 'lib/pg/em.rb', line 277 def single_row_mode? self.class.single_row_mode? end |
#status ⇒ Number
Returns status of connection: PG::CONNECTION_OK or PG::CONNECTION_BAD.
Returns PG::CONNECTION_BAD
for connections with async_command_aborted
flag set by expired query timeout. Otherwise return whatever PG::Connection#status returns.
516 517 518 519 520 521 522 |
# File 'lib/pg/em.rb', line 516 def status if @async_command_aborted CONNECTION_BAD else super end end |
#transaction {|client| ... } ⇒ Object
Avoid using PG::EM::Client#*_defer calls inside the block or make sure all queries are completed before the provided block terminates.
Executes a BEGIN at the start of the block and a COMMIT at the end of the block or ROLLBACK if any exception occurs.
Calls to #transaction may be nested, however without sub-transactions (save points). If the innermost transaction block raises an error the transaction is rolled back to the state before the outermost transaction began.
This is an extension to the PG::Connection#transaction method as it does not support nesting in this way.
The method is sensitive to the transaction status and will safely rollback on any sql error even when it was catched by some rescue block. But consider that rescuing any sql error within an utility method is a bad idea.
This method works in both blocking/async modes (regardles of the reactor state) and is considered as a generic extension to the PG::Connection#transaction method.
1067 1068 1069 1070 1071 1072 1073 1074 1075 1076 1077 1078 1079 1080 1081 1082 1083 1084 1085 1086 1087 1088 1089 1090 1091 1092 1093 1094 1095 1096 1097 1098 1099 1100 1101 1102 1103 1104 1105 1106 1107 1108 1109 1110 1111 1112 1113 1114 1115 1116 1117 1118 1119 1120 1121 1122 1123 1124 |
# File 'lib/pg/em.rb', line 1067 def transaction raise ArgumentError, 'Must supply block for PG::EM::Client#transaction' unless block_given? tcount = @client_tran_count.to_i case transaction_status when PQTRANS_IDLE # there is no transaction yet, so let's begin exec(TRAN_BEGIN_QUERY) # reset transaction count in case user code rolled it back before tcount = 0 if tcount != 0 when PQTRANS_INTRANS # transaction in progress, leave it be else # transaction failed, is in unknown state or command is active # in any case calling begin will raise server transaction error exec(TRAN_BEGIN_QUERY) # raises PG::InFailedSqlTransaction end # memoize nested count @client_tran_count = tcount + 1 begin result = yield self rescue # error was raised case transaction_status when PQTRANS_INTRANS, PQTRANS_INERROR # do not rollback if transaction was rolled back before # or is in unknown state, which means connection reset is needed # and rollback only from the outermost transaction block exec(TRAN_ROLLBACK_QUERY) if tcount.zero? end # raise again raise else # we are good (but not out of woods yet) case transaction_status when PQTRANS_INTRANS # commit only from the outermost transaction block exec(TRAN_COMMIT_QUERY) if tcount.zero? when PQTRANS_INERROR # no ruby error was raised (or an error was rescued in code block) # but there was an sql error anyway # so rollback after the outermost block exec(TRAN_ROLLBACK_QUERY) if tcount.zero? when PQTRANS_IDLE # the code block has terminated the transaction on its own # so just reset the counter tcount = 0 else # something isn't right, so provoke an error just in case exec(TRAN_ROLLBACK_QUERY) if tcount.zero? end result ensure @client_tran_count = tcount end end |
#wait_for_notify(timeout = nil) {|name, pid, payload| ... } ⇒ String|nil Also known as: notifies_wait
Blocks while waiting for notification(s), or until the optional timeout
is reached, whichever comes first. Returns nil
if timeout
is reached, the name of the NOTIFY
event otherwise.
If EventMachine reactor is running and the current fiber isn’t the root fiber this method performs command asynchronously yielding current fiber. Other fibers can process while the current one is waiting for notifications.
Otherwise performs a blocking call to a parent method.
961 962 963 964 965 966 967 968 969 970 971 972 973 974 975 976 977 |
# File 'lib/pg/em.rb', line 961 def wait_for_notify(timeout = nil) if ::EM.reactor_running? && !(f = Fiber.current).equal?(ROOT_FIBER) unless notify_hash = notifies if (notify_hash = fiber_sync wait_for_notify_defer(timeout), f).is_a?(::Exception) raise notify_hash end end if notify_hash if block_given? yield notify_hash.values_at(:relname, :be_pid, :extra) end notify_hash[:relname] end else super end end |
#wait_for_notify_defer(timeout = nil) {|notification| ... } ⇒ FeaturedDeferrable Also known as: notifies_wait_defer
Asynchronously waits for notification or until the optional timeout
is reached, whichever comes first. timeout
is measured in seconds and can be fractional. Returns immediately with a Deferrable.
Pass the block to the returned deferrable’s callback
to obtain notification hash. In case of connection error errback
hook is called with an error object. If the timeout
is reached nil
is passed to deferrable’s callback
. If the block is provided it’s bound to both the callback
and errback
hooks of the returned deferrable. If another call is made to this method before the notification is received (or before reaching timeout) the previous deferrable’s errback
will be called with nil
argument.
708 709 710 711 712 713 714 715 716 717 718 719 720 721 722 723 724 |
# File 'lib/pg/em.rb', line 708 def wait_for_notify_defer(timeout = nil, &blk) df = FeaturedDeferrable.new begin check_async_command_aborted! if status == CONNECTION_OK setup_emio_watcher.watch_notify(df, timeout) else raise_error ConnectionBad end rescue Error => e ::EM.next_tick { async_autoreconnect!(df, e) } rescue Exception => e ::EM.next_tick { df.fail(e) } end df.completion(&blk) if block_given? df end |