Class: PG::EM::Client

Inherits:
Connection
  • Object
show all
Defined in:
lib/pg/em.rb,
lib/pg/em/client/watcher.rb,
lib/pg/em/client/connect_watcher.rb

Overview

PostgreSQL EventMachine client

Author

Rafal Michalski

Client is a PG::Connection wrapper designed for EventMachine.

The following new methods:

are added to execute queries asynchronously, returning Deferrable object.

The following methods of PG::Connection are overloaded:

and are now auto-detecting if EventMachine is running and performing commands asynchronously (blocking only current fiber) or calling parent thread-blocking methods.

If #async_autoreconnect option is set to true, all of the above methods (in asynchronous mode) try to re-connect after a connection error occurs. It’s performed behind the scenes, so no error is raised, except if there was a transaction in progress. In such instance the error is raised after establishing connection to signal that the transaction was aborted.

If you want to detect auto re-connect event use #on_autoreconnect property/option.

To enable auto-reconnecting set:

client.async_autoreconnect = true

or pass as Client.new hash argument:

PG::EM::Client.new dbname: 'bar', async_autoreconnect: true

There are also new methods:

which are asynchronous versions of PG::Connection.new and PG:Connection#reset.

Additionally the following methods are overloaded:

providing auto-detecting asynchronous (fiber-synchronized) or thread-blocking methods for (re)connecting.

Otherwise nothing changes in PG::Connection API. See PG::Connection docs for explanation of arguments to the above methods.

Warning:

#describe_prepared and #exec_prepared after #prepare should only be invoked on the same connection. If you are using a ConnectionPool, make sure to acquire a single connection first or execute prepare command on every connection using #on_connect hook.

Defined Under Namespace

Modules: ConnectWatcher, Watcher

Constant Summary collapse

DEFAULT_ASYNC_VARS =
{
  :@async_autoreconnect => nil,
  :@connect_timeout => nil,
  :@query_timeout => 0,
  :@on_connect => nil,
  :@on_autoreconnect => nil,
  :@async_command_aborted => false,
}.freeze
TRAN_BEGIN_QUERY =
'BEGIN'
TRAN_ROLLBACK_QUERY =
'ROLLBACK'
TRAN_COMMIT_QUERY =
'COMMIT'
@@connect_timeout_envvar =

environment variable name for connect_timeout fallback value

conndefaults.find{|d| d[:keyword] == "connect_timeout" }[:envvar]

Instance Attribute Summary collapse

Deferrable connection methods collapse

Auto-sensing fiber-synchronized connection methods collapse

Deferrable command methods collapse

Auto-sensing fiber-synchronized command methods collapse

Class Method Summary collapse

Instance Method Summary collapse

Instance Attribute Details

#async_autoreconnectBoolean

Enable/disable auto re-connect feature (true/false). Defaults to false unless #on_autoreconnect is specified in a connection_hash.

Changing #on_autoreconnect with accessor method doesn’t change the state of #async_autoreconnect.

You may set :async_autoreconnect in a connection_hash argument passed to new or connect_defer.

Returns:

  • (Boolean)

    asynchronous auto re-connect status



142
143
144
# File 'lib/pg/em.rb', line 142

def async_autoreconnect
  @async_autoreconnect
end

#connect_timeoutFloat

Connection timeout. Affects #reset and #reset_defer.

Changing this property does not affect thread-blocking #reset unless passed as a connection_hash.

To enable it set to some positive value. To disable it: set to 0.

You may set :connect_timeout in a connection_hash argument passed to new or connect_defer.

Returns:

  • (Float)

    connection timeout in seconds



117
118
119
# File 'lib/pg/em.rb', line 117

def connect_timeout
  @connect_timeout
end

#on_autoreconnect {|pg, error| ... } ⇒ Proc<Client, Error>

A proc like object that is being called after a connection with the server has been automatically re-established. It’s being invoked just before the pending command is sent to the server.

The first argument it receives is the connected PG::EM::Client instance. The second is the original error that caused the reconnecting process.

It’s possible to execute queries from the on_autoreconnect hook. Code is being executed in a fiber context, so both deferrable and fiber-synchronized query commands may be used.

If exception is raised during execution of the on_autoreconnect hook the reset operation will fail with that exception.

The hook can control later actions with its return value:

  • false (explicitly, nil is ignored) - the original exception is raised/passed back and the pending query command is not sent again to the server.

  • true (explicitly, truish values are ignored), the pending command is called regardless of the connection’s last transaction status.

  • Exception object - is raised/passed back and the pending command is not sent.

  • Deferrable object - the chosen action will depend on the returned deferrable status.

  • Other values are ignored and the pending query command is immediately sent to the server unless there was a transaction in progress before the connection was reset.

If both on_connect and on_autoreconnect hooks are set, the on_connect is being called first and on_autoreconnect is called only when on_connect succeeds.

You may set :on_autoreconnect hook in a connection_hash argument passed to new or connect_defer.

Examples:

How to use deferrable in on_autoreconnect hook

pg.on_autoreconnect do |pg, e|
  logger.warn "PG connection was reset: #{e.inspect}, delaying 1 sec."
  EM::DefaultDeferrable.new.tap do |df|
    EM.add_timer(1) { df.succeed }
  end
end

Yield Parameters:

  • pg (Client)

    re-connected client instance

  • error (Exception)

    an error after which the auto re-connect process began.

Yield Returns:

  • (false|true|Exception|EM::Deferrable|*)

Returns:

  • (Proc<Client, Error>)

    auto re-connect hook



196
# File 'lib/pg/em.rb', line 196

attr_writer :on_autoreconnect

#on_connect {|pg, is_async, is_reset| ... } ⇒ Proc<Client,is_async,is_reset>

A proc like object that is being called after a connection with the server has been established.

The first argument it receives is the connected PG::EM::Client instance. The second argument is true if the connection was established in asynchronous manner, false otherwise. The third argument is true when the connection has been reset or false on new connection.

It’s possible to execute queries from the on_connect hook. Code is being executed in a fiber context, so both deferrable and fiber-synchronized query commands may be used. However deferrable commands will work only if eventmachine reactor is running, so check if is_async is true.

If exception is raised during execution of the on_connect hook the connecting/reset operation will fail with that exception.

The hook can control later actions with its return value:

  • Deferrable object - the connection establishing status will depend on the returned deferrable status (only in asynchronous mode).

  • Other values are ignored.

If both on_connect and on_autoreconnect hooks are set, the on_connect is being called first and on_autoreconnect is called only when on_connect succeeds.

You may set :on_connect hook in a connection_hash argument passed to new or connect_defer.

Examples:

How to use prepare in on_connect hook

PG::EM::Client.new(on_connect: proc {|pg|
  pg.prepare("species_by_name",
   "select id, name from animals where species=$1 order by name")
})

Yield Parameters:

  • pg (Client)

    connected client instance

  • is_async (Boolean)

    flag indicating if the connection was established asynchronously

  • is_reset (Boolean)

    flag indicating if the connection client was reset

Yield Returns:

  • (EM::Deferrable|*)

Returns:

  • (Proc<Client,is_async,is_reset>)

    connect hook



252
# File 'lib/pg/em.rb', line 252

attr_writer :on_connect

#query_timeoutFloat

Aborts async command processing if server response time exceedes query_timeout seconds. This does not apply to #reset and #reset_defer.

To enable it set to some positive value. To disable it: set to 0.

You may set :query_timeout in a connection_hash argument passed to new or connect_defer.

Returns:

  • (Float)

    query timeout in seconds



129
130
131
# File 'lib/pg/em.rb', line 129

def query_timeout
  @query_timeout
end

Class Method Details

.connect_defer(*args) {|pg| ... } ⇒ FeaturedDeferrable Also known as: async_connect

Attempts to establish the connection asynchronously.

Pass the block to the returned deferrable’s callback to obtain newly created and already connected PG::EM::Client object. In case of connection error errback hook receives an error object as an argument. If the block is provided it’s bound to both callback and errback hooks of the returned deferrable.

Special PG::EM::Client options (e.g.: #async_autoreconnect) must be provided in a connection_hash argument variant. They will be ignored if passed in a connection_string.

client_encoding will be set according to Encoding.default_internal.

Yield Parameters:

  • pg (Client|PG::Error)

    new and connected client instance on success or an instance of raised PG::Error

Returns:

See Also:



354
355
356
357
358
359
360
361
362
363
364
# File 'lib/pg/em.rb', line 354

def self.connect_defer(*args, &blk)
  df = FeaturedDeferrable.new(&blk)
  async_args = parse_async_options(args)
  conn = df.protect { connect_start(*args) }
  if conn
    async_args.each {|k, v| conn.instance_variable_set(k, v) }
    ::EM.watch(conn.socket_io, ConnectWatcher, conn, df, false).
      poll_connection_and_check
  end
  df
end

.new(*args, &blk) ⇒ Object Also known as: connect, open, setdb, setdblogin

Creates new instance of PG::EM::Client and attempts to establish connection.

Performs command asynchronously yielding from current fiber if EventMachine reactor is running and current fiber isn’t the root fiber. Other fibers can process while waiting for the server to complete the request.

Otherwise performs a thread-blocking call to the parent method.

Special PG::EM::Client options (e.g.: #async_autoreconnect) must be provided in a connection_hash argument variant. They will be ignored if passed in a connection_string.

client_encoding will be set according to Encoding.default_internal.

Raises:

  • (PG::Error)

See Also:



456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
# File 'lib/pg/em.rb', line 456

def self.new(*args, &blk)
  if ::EM.reactor_running? && !(f = Fiber.current).equal?(ROOT_FIBER)
    connect_defer(*args) {|r| f.resume(r) }

    conn = Fiber.yield
    raise conn if conn.is_a?(::Exception)
    if block_given?
      begin
        yield conn
      ensure
        conn.finish
      end
    else
      conn
    end
  else
    conn = super(*args)
    if on_connect = conn.on_connect
      on_connect.call(conn, false, false)
    end
    conn
  end
end

.single_row_mode?Boolean

Returns true if pg supports single row mode or false otherwise. Single row mode is available since libpq 9.2.

Returns:

  • (Boolean)

See Also:



270
271
272
# File 'lib/pg/em.rb', line 270

def self.single_row_mode?
  method_defined? :set_single_row_mode
end

Instance Method Details

#blocking_get_resultObject



795
# File 'lib/pg/em.rb', line 795

alias_method :blocking_get_result, :get_result

#blocking_wait_for_notifyObject



794
# File 'lib/pg/em.rb', line 794

alias_method :blocking_wait_for_notify, :wait_for_notify

#describe_portal(portal_name) {|result| ... } ⇒ PG::Result, Object

Retrieves information about the portal portal_name,

If EventMachine reactor is running and the current fiber isn’t the root fiber this method performs command asynchronously yielding current fiber. Other fibers can process while waiting for the server to complete the request.

Otherwise performs a blocking call to a parent method.

Yield Parameters:

  • result (PG::Result)

    command result on success

Returns:

  • (PG::Result)

    if block wasn’t given

  • (Object)

    result of the given block

Raises:

  • (PG::Error)

See Also:



907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
# File 'lib/pg/em.rb', line 907

%w(
  exec              exec_defer
  exec_params       exec_defer
  exec_prepared     exec_prepared_defer
  prepare           prepare_defer
  describe_prepared describe_prepared_defer
  describe_portal   describe_portal_defer
  get_last_result   get_last_result_defer
  ).each_slice(2) do |name, defer_name|

  class_eval <<-EOD, __FILE__, __LINE__
    def #{name}(*args, &blk)
      if ::EM.reactor_running? && !(f = Fiber.current).equal?(ROOT_FIBER)
        if (result = fiber_sync #{defer_name}(*args), f).is_a?(::Exception)
          raise result
        end
        if block_given? && result
          begin
            yield result
          ensure
            result.clear
          end
        else
          result
        end
      else
        super
      end
    end
  EOD
end

#describe_portal_defer(portal_name) {|result| ... } ⇒ FeaturedDeferrable

Asynchronously sends command to retrieve information about the portal portal_name, and immediately returns with a Deferrable.

Use the returned Deferrable’s callback and errback methods to get the result. If the block is provided it’s bound to both the callback and errback hooks of the returned deferrable.

Yield Parameters:

  • result (PG::Result|Error)

    command result on success or a PG::Error instance on error.

Returns:

See Also:



653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
# File 'lib/pg/em.rb', line 653

%w(
  exec_defer              send_query
  prepare_defer           send_prepare
  exec_prepared_defer     send_query_prepared
  describe_prepared_defer send_describe_prepared
  describe_portal_defer   send_describe_portal
).each_slice(2) do |defer_name, send_name|

  class_eval <<-EOD, __FILE__, __LINE__
  def #{defer_name}(*args, &blk)
    df = FeaturedDeferrable.new
    send_proc = proc do
      #{send_name}(*args)
      setup_emio_watcher.watch_results(df, send_proc)
    end
    begin
      check_async_command_aborted!
      @last_transaction_status = transaction_status
      send_proc.call
    rescue Error => e
      ::EM.next_tick { async_autoreconnect!(df, e, send_proc) }
    rescue Exception => e
      ::EM.next_tick { df.fail(e) }
    end
    df.completion(&blk) if block_given?
    df
  end
  EOD

end

#describe_prepared(statement_name) {|result| ... } ⇒ PG::Result, Object

Retrieves information about the prepared statement statement_name,

If EventMachine reactor is running and the current fiber isn’t the root fiber this method performs command asynchronously yielding current fiber. Other fibers can process while waiting for the server to complete the request.

Otherwise performs a blocking call to a parent method.

Yield Parameters:

  • result (PG::Result)

    command result on success

Returns:

  • (PG::Result)

    if block wasn’t given

  • (Object)

    result of the given block

Raises:

  • (PG::Error)

See Also:



907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
# File 'lib/pg/em.rb', line 907

%w(
  exec              exec_defer
  exec_params       exec_defer
  exec_prepared     exec_prepared_defer
  prepare           prepare_defer
  describe_prepared describe_prepared_defer
  describe_portal   describe_portal_defer
  get_last_result   get_last_result_defer
  ).each_slice(2) do |name, defer_name|

  class_eval <<-EOD, __FILE__, __LINE__
    def #{name}(*args, &blk)
      if ::EM.reactor_running? && !(f = Fiber.current).equal?(ROOT_FIBER)
        if (result = fiber_sync #{defer_name}(*args), f).is_a?(::Exception)
          raise result
        end
        if block_given? && result
          begin
            yield result
          ensure
            result.clear
          end
        else
          result
        end
      else
        super
      end
    end
  EOD
end

#describe_prepared_defer(statement_name) {|result| ... } ⇒ FeaturedDeferrable

Asynchronously sends command to retrieve information about the prepared statement statement_name, and immediately returns with a Deferrable.

Use the returned Deferrable’s callback and errback methods to get the result. If the block is provided it’s bound to both the callback and errback hooks of the returned deferrable.

Yield Parameters:

  • result (PG::Result|Error)

    command result on success or a PG::Error instance on error.

Returns:

See Also:



653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
# File 'lib/pg/em.rb', line 653

%w(
  exec_defer              send_query
  prepare_defer           send_prepare
  exec_prepared_defer     send_query_prepared
  describe_prepared_defer send_describe_prepared
  describe_portal_defer   send_describe_portal
).each_slice(2) do |defer_name, send_name|

  class_eval <<-EOD, __FILE__, __LINE__
  def #{defer_name}(*args, &blk)
    df = FeaturedDeferrable.new
    send_proc = proc do
      #{send_name}(*args)
      setup_emio_watcher.watch_results(df, send_proc)
    end
    begin
      check_async_command_aborted!
      @last_transaction_status = transaction_status
      send_proc.call
    rescue Error => e
      ::EM.next_tick { async_autoreconnect!(df, e, send_proc) }
    rescue Exception => e
      ::EM.next_tick { df.fail(e) }
    end
    df.completion(&blk) if block_given?
    df
  end
  EOD

end

#exec(sql) {|result| ... } ⇒ PG::Result, Object Also known as: query, async_query, async_exec

Sends SQL query request specified by sql to PostgreSQL.

If EventMachine reactor is running and the current fiber isn’t the root fiber this method performs command asynchronously yielding current fiber. Other fibers can process while waiting for the server to complete the request.

Otherwise performs a blocking call to a parent method.

Yield Parameters:

  • result (PG::Result)

    command result on success

Returns:

  • (PG::Result)

    if block wasn’t given

  • (Object)

    result of the given block

Raises:

  • (PG::Error)

See Also:



907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
# File 'lib/pg/em.rb', line 907

%w(
  exec              exec_defer
  exec_params       exec_defer
  exec_prepared     exec_prepared_defer
  prepare           prepare_defer
  describe_prepared describe_prepared_defer
  describe_portal   describe_portal_defer
  get_last_result   get_last_result_defer
  ).each_slice(2) do |name, defer_name|

  class_eval <<-EOD, __FILE__, __LINE__
    def #{name}(*args, &blk)
      if ::EM.reactor_running? && !(f = Fiber.current).equal?(ROOT_FIBER)
        if (result = fiber_sync #{defer_name}(*args), f).is_a?(::Exception)
          raise result
        end
        if block_given? && result
          begin
            yield result
          ensure
            result.clear
          end
        else
          result
        end
      else
        super
      end
    end
  EOD
end

#exec_defer(sql, params = nil, result_format = nil) {|result| ... } ⇒ FeaturedDeferrable Also known as: query_defer, async_query_defer, async_exec_defer, exec_params_defer

Sends SQL query request specified by sql to PostgreSQL for asynchronous processing, and immediately returns with deferrable.

Use the returned Deferrable’s callback and errback methods to get the result. If the block is provided it’s bound to both the callback and errback hooks of the returned deferrable.

Yield Parameters:

  • result (PG::Result|Error)

    command result on success or a PG::Error instance on error.

Returns:

See Also:



653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
# File 'lib/pg/em.rb', line 653

%w(
  exec_defer              send_query
  prepare_defer           send_prepare
  exec_prepared_defer     send_query_prepared
  describe_prepared_defer send_describe_prepared
  describe_portal_defer   send_describe_portal
).each_slice(2) do |defer_name, send_name|

  class_eval <<-EOD, __FILE__, __LINE__
  def #{defer_name}(*args, &blk)
    df = FeaturedDeferrable.new
    send_proc = proc do
      #{send_name}(*args)
      setup_emio_watcher.watch_results(df, send_proc)
    end
    begin
      check_async_command_aborted!
      @last_transaction_status = transaction_status
      send_proc.call
    rescue Error => e
      ::EM.next_tick { async_autoreconnect!(df, e, send_proc) }
    rescue Exception => e
      ::EM.next_tick { df.fail(e) }
    end
    df.completion(&blk) if block_given?
    df
  end
  EOD

end

#exec_params(sql, params = nil, result_format = nil) {|result| ... } ⇒ PG::Result, Object

Sends SQL query request specified by sql with optional params and result_format to PostgreSQL.

If EventMachine reactor is running and the current fiber isn’t the root fiber this method performs command asynchronously yielding current fiber. Other fibers can process while waiting for the server to complete the request.

Otherwise performs a blocking call to a parent method.

Yield Parameters:

  • result (PG::Result)

    command result on success

Returns:

  • (PG::Result)

    if block wasn’t given

  • (Object)

    result of the given block

Raises:

  • (PG::Error)

See Also:



907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
# File 'lib/pg/em.rb', line 907

%w(
  exec              exec_defer
  exec_params       exec_defer
  exec_prepared     exec_prepared_defer
  prepare           prepare_defer
  describe_prepared describe_prepared_defer
  describe_portal   describe_portal_defer
  get_last_result   get_last_result_defer
  ).each_slice(2) do |name, defer_name|

  class_eval <<-EOD, __FILE__, __LINE__
    def #{name}(*args, &blk)
      if ::EM.reactor_running? && !(f = Fiber.current).equal?(ROOT_FIBER)
        if (result = fiber_sync #{defer_name}(*args), f).is_a?(::Exception)
          raise result
        end
        if block_given? && result
          begin
            yield result
          ensure
            result.clear
          end
        else
          result
        end
      else
        super
      end
    end
  EOD
end

#exec_prepared(statement_name, params = nil, result_format = nil) {|result| ... } ⇒ PG::Result, Object

Executes prepared named statement specified by statement_name.

If EventMachine reactor is running and the current fiber isn’t the root fiber this method performs command asynchronously yielding current fiber. Other fibers can process while waiting for the server to complete the request.

Otherwise performs a blocking call to a parent method.

Yield Parameters:

  • result (PG::Result)

    command result on success

Returns:

  • (PG::Result)

    if block wasn’t given

  • (Object)

    result of the given block

Raises:

  • (PG::Error)

See Also:



907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
# File 'lib/pg/em.rb', line 907

%w(
  exec              exec_defer
  exec_params       exec_defer
  exec_prepared     exec_prepared_defer
  prepare           prepare_defer
  describe_prepared describe_prepared_defer
  describe_portal   describe_portal_defer
  get_last_result   get_last_result_defer
  ).each_slice(2) do |name, defer_name|

  class_eval <<-EOD, __FILE__, __LINE__
    def #{name}(*args, &blk)
      if ::EM.reactor_running? && !(f = Fiber.current).equal?(ROOT_FIBER)
        if (result = fiber_sync #{defer_name}(*args), f).is_a?(::Exception)
          raise result
        end
        if block_given? && result
          begin
            yield result
          ensure
            result.clear
          end
        else
          result
        end
      else
        super
      end
    end
  EOD
end

#exec_prepared_defer(statement_name, params = nil, result_format = nil) {|result| ... } ⇒ FeaturedDeferrable

Execute prepared named statement specified by statement_name asynchronously, and immediately returns with a Deferrable.

Use the returned Deferrable’s callback and errback methods to get the result. If the block is provided it’s bound to both the callback and errback hooks of the returned deferrable.

Yield Parameters:

  • result (PG::Result|Error)

    command result on success or a PG::Error instance on error.

Returns:

See Also:



653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
# File 'lib/pg/em.rb', line 653

%w(
  exec_defer              send_query
  prepare_defer           send_prepare
  exec_prepared_defer     send_query_prepared
  describe_prepared_defer send_describe_prepared
  describe_portal_defer   send_describe_portal
).each_slice(2) do |defer_name, send_name|

  class_eval <<-EOD, __FILE__, __LINE__
  def #{defer_name}(*args, &blk)
    df = FeaturedDeferrable.new
    send_proc = proc do
      #{send_name}(*args)
      setup_emio_watcher.watch_results(df, send_proc)
    end
    begin
      check_async_command_aborted!
      @last_transaction_status = transaction_status
      send_proc.call
    rescue Error => e
      ::EM.next_tick { async_autoreconnect!(df, e, send_proc) }
    rescue Exception => e
      ::EM.next_tick { df.fail(e) }
    end
    df.completion(&blk) if block_given?
    df
  end
  EOD

end

#finishObject Also known as: close

Closes the backend connection.

Detaches watch handler to prevent memory leak after calling parent PG::Connection#finish.



500
501
502
503
504
505
506
# File 'lib/pg/em.rb', line 500

def finish
  super
  if @watcher
    @watcher.detach if @watcher.watching?
    @watcher = nil
  end
end

#get_last_result {|result| ... } ⇒ PG::Result, ...

Retrieves all available results on the current connection (from previously issued asynchronous commands like send_query()) and returns the last non-NULL result, or nil if no results are available.

If EventMachine reactor is running and the current fiber isn’t the root fiber this method performs command asynchronously yielding current fiber. Other fibers can process while waiting for the server to complete the request.

Otherwise performs a blocking call to a parent method.

Yield Parameters:

  • result (PG::Result)

    command result on success

Returns:

  • (PG::Result)

    if block wasn’t given

  • (Object)

    result of the given block

  • (nil)

    if no more results

Raises:

  • (PG::Error)

See Also:



907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
# File 'lib/pg/em.rb', line 907

%w(
  exec              exec_defer
  exec_params       exec_defer
  exec_prepared     exec_prepared_defer
  prepare           prepare_defer
  describe_prepared describe_prepared_defer
  describe_portal   describe_portal_defer
  get_last_result   get_last_result_defer
  ).each_slice(2) do |name, defer_name|

  class_eval <<-EOD, __FILE__, __LINE__
    def #{name}(*args, &blk)
      if ::EM.reactor_running? && !(f = Fiber.current).equal?(ROOT_FIBER)
        if (result = fiber_sync #{defer_name}(*args), f).is_a?(::Exception)
          raise result
        end
        if block_given? && result
          begin
            yield result
          ensure
            result.clear
          end
        else
          result
        end
      else
        super
      end
    end
  EOD
end

#get_last_result_defer {|result| ... } ⇒ FeaturedDeferrable

Asynchronously retrieves all available results on the current connection (from previously issued asynchronous commands like send_query()) and immediately returns with a Deferrable. It then receives the last non-NULL result on :succeed, or nil if no results are available.

Use the returned Deferrable’s callback and errback methods to get the result. If the block is provided it’s bound to both the callback and errback hooks of the returned deferrable.

Yield Parameters:

  • result (PG::Result|Error|nil)

    command result on success or a PG::Error instance on error or nil if no results are available.

Returns:

See Also:



775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
# File 'lib/pg/em.rb', line 775

def get_last_result_defer(&blk)
  df = FeaturedDeferrable.new
  begin
    if status == CONNECTION_OK
      setup_emio_watcher.watch_results(df)
    else
      df.succeed
    end
  rescue Error => e
    ::EM.next_tick { async_autoreconnect!(df, e) }
  rescue Exception => e
    ::EM.next_tick { df.fail(e) }
  end
  df.completion(&blk) if block_given?
  df
end

#get_result {|result| ... } ⇒ PG::Result, ...

Retrieves the next result from a call to #send_query (or another asynchronous command). If no more results are available returns nil and the block (if given) is never called.

If EventMachine reactor is running and the current fiber isn’t the root fiber this method performs command asynchronously yielding current fiber. Other fibers can process while waiting for the server to complete the request.

Otherwise performs a blocking call to a parent method.

Yield Parameters:

  • result (PG::Result)

    command result on success

Returns:

  • (PG::Result)

    if block wasn’t given

  • (Object)

    result of the given block

  • (nil)

    if no more results

Raises:

  • (PG::Error)

See Also:



990
991
992
993
994
995
996
997
998
999
1000
1001
1002
1003
1004
1005
1006
1007
# File 'lib/pg/em.rb', line 990

def get_result
  if is_busy && ::EM.reactor_running? && !(f = Fiber.current).equal?(ROOT_FIBER)
    if (result = fiber_sync get_result_defer, f).is_a?(::Exception)
      raise result
    end
    if block_given? && result
      begin
        yield result
      ensure
        result.clear
      end
    else
      result
    end
  else
    super
  end
end

#get_result_defer {|result| ... } ⇒ FeaturedDeferrable

Asynchronously retrieves the next result from a call to #send_query (or another asynchronous command) and immediately returns with a Deferrable. It then receives the result object on :succeed, or nil if no results are available.

Use the returned Deferrable’s callback and errback methods to get the result. If the block is provided it’s bound to both the callback and errback hooks of the returned deferrable.

Yield Parameters:

  • result (PG::Result|Error|nil)

    command result on success or a PG::Error instance on error or nil if no results are available.

Returns:

See Also:



741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
# File 'lib/pg/em.rb', line 741

def get_result_defer(&blk)
  df = FeaturedDeferrable.new
  begin
    if status == CONNECTION_OK
      if is_busy
        setup_emio_watcher.watch_results(df, nil, true)
      else
        df.succeed blocking_get_result
      end
    else
      df.succeed
    end
  rescue Error => e
    ::EM.next_tick { async_autoreconnect!(df, e) }
  rescue Exception => e
    ::EM.next_tick { df.fail(e) }
  end
  df.completion(&blk) if block_given?
  df
end

#prepare(stmt_name, sql, param_types = nil) {|result| ... } ⇒ PG::Result, Object

Prepares statement sql with name stmt_name to be executed later.

If EventMachine reactor is running and the current fiber isn’t the root fiber this method performs command asynchronously yielding current fiber. Other fibers can process while waiting for the server to complete the request.

Otherwise performs a blocking call to a parent method.

Yield Parameters:

  • result (PG::Result)

    command result on success

Returns:

  • (PG::Result)

    if block wasn’t given

  • (Object)

    result of the given block

Raises:

  • (PG::Error)

See Also:



907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
# File 'lib/pg/em.rb', line 907

%w(
  exec              exec_defer
  exec_params       exec_defer
  exec_prepared     exec_prepared_defer
  prepare           prepare_defer
  describe_prepared describe_prepared_defer
  describe_portal   describe_portal_defer
  get_last_result   get_last_result_defer
  ).each_slice(2) do |name, defer_name|

  class_eval <<-EOD, __FILE__, __LINE__
    def #{name}(*args, &blk)
      if ::EM.reactor_running? && !(f = Fiber.current).equal?(ROOT_FIBER)
        if (result = fiber_sync #{defer_name}(*args), f).is_a?(::Exception)
          raise result
        end
        if block_given? && result
          begin
            yield result
          ensure
            result.clear
          end
        else
          result
        end
      else
        super
      end
    end
  EOD
end

#prepare_defer(stmt_name, sql, param_types = nil) {|result| ... } ⇒ FeaturedDeferrable

Prepares statement sql with name stmt_name to be executed later asynchronously, and immediately returns with a Deferrable.

Use the returned Deferrable’s callback and errback methods to get the result. If the block is provided it’s bound to both the callback and errback hooks of the returned deferrable.

Yield Parameters:

  • result (PG::Result|Error)

    command result on success or a PG::Error instance on error.

Returns:

See Also:



653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
# File 'lib/pg/em.rb', line 653

%w(
  exec_defer              send_query
  prepare_defer           send_prepare
  exec_prepared_defer     send_query_prepared
  describe_prepared_defer send_describe_prepared
  describe_portal_defer   send_describe_portal
).each_slice(2) do |defer_name, send_name|

  class_eval <<-EOD, __FILE__, __LINE__
  def #{defer_name}(*args, &blk)
    df = FeaturedDeferrable.new
    send_proc = proc do
      #{send_name}(*args)
      setup_emio_watcher.watch_results(df, send_proc)
    end
    begin
      check_async_command_aborted!
      @last_transaction_status = transaction_status
      send_proc.call
    rescue Error => e
      ::EM.next_tick { async_autoreconnect!(df, e, send_proc) }
    rescue Exception => e
      ::EM.next_tick { df.fail(e) }
    end
    df.completion(&blk) if block_given?
    df
  end
  EOD

end

#raise_error(klass = Error, message = error_message) ⇒ Object



797
798
799
800
801
# File 'lib/pg/em.rb', line 797

def raise_error(klass=Error, message=error_message)
  error = klass.new(message)
  error.instance_variable_set(:@connection, self)
  raise error
end

#resetObject

Attempts to reset the connection.

Performs command asynchronously yielding from current fiber if EventMachine reactor is running and current fiber isn’t the root fiber. Other fibers can process while waiting for the server to complete the request.

Otherwise performs a thread-blocking call to the parent method.

Raises:

  • (PG::Error)

See Also:



418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
# File 'lib/pg/em.rb', line 418

def reset
  if ::EM.reactor_running? && !(f = Fiber.current).equal?(ROOT_FIBER)
    reset_defer {|r| f.resume(r) }

    conn = Fiber.yield
    raise conn if conn.is_a?(::Exception)
    conn
  else
    @async_command_aborted = false
    if @watcher
      @watcher.detach if @watcher.watching?
      @watcher = nil
    end
    super
    @on_connect.call(self, false, true) if @on_connect
    self
  end
end

#reset_defer {|pg| ... } ⇒ FeaturedDeferrable Also known as: async_reset

Attempts to reset the connection asynchronously.

Pass the block to the returned deferrable’s callback to execute after successfull reset. If the block is provided it’s bound to callback and errback hooks of the returned deferrable.

Yield Parameters:

  • pg (Client|PG::Error)

    reconnected client instance on success or an instance of raised PG::Error

Returns:

See Also:



382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
# File 'lib/pg/em.rb', line 382

def reset_defer(&blk)
  @async_command_aborted = false
  df = FeaturedDeferrable.new(&blk)
  # there can be only one watch handler over the socket
  # apparently eventmachine has hard time dealing with more than one
  if @watcher
    @watcher.detach if @watcher.watching?
    @watcher = nil
  end
  ret = df.protect(:fail) { reset_start }
  unless ret == :fail
    ::EM.watch(self.socket_io, ConnectWatcher, self, df, true).
      poll_connection_and_check
  end
  df
end

#single_row_mode?Boolean

Returns true if pg supports single row mode or false otherwise.

Returns:

  • (Boolean)

See Also:



277
278
279
# File 'lib/pg/em.rb', line 277

def single_row_mode?
  self.class.single_row_mode?
end

#statusNumber

Returns status of connection: PG::CONNECTION_OK or PG::CONNECTION_BAD.

Returns PG::CONNECTION_BAD for connections with async_command_aborted flag set by expired query timeout. Otherwise return whatever PG::Connection#status returns.

Returns:

  • (Number)

See Also:



516
517
518
519
520
521
522
# File 'lib/pg/em.rb', line 516

def status
  if @async_command_aborted
    CONNECTION_BAD
  else
    super
  end
end

#transaction {|client| ... } ⇒ Object

Note:

Avoid using PG::EM::Client#*_defer calls inside the block or make sure all queries are completed before the provided block terminates.

Executes a BEGIN at the start of the block and a COMMIT at the end of the block or ROLLBACK if any exception occurs.

Calls to #transaction may be nested, however without sub-transactions (save points). If the innermost transaction block raises an error the transaction is rolled back to the state before the outermost transaction began.

This is an extension to the PG::Connection#transaction method as it does not support nesting in this way.

The method is sensitive to the transaction status and will safely rollback on any sql error even when it was catched by some rescue block. But consider that rescuing any sql error within an utility method is a bad idea.

This method works in both blocking/async modes (regardles of the reactor state) and is considered as a generic extension to the PG::Connection#transaction method.

Examples:

Nested transaction example

def add_comment(user_id, text)
  db.transaction do
    cmt_id = db.query(
      'insert into comments (text) where user_id=$1 values ($2) returning id',
      [user_id, text]).getvalue(0,0)
    db.query(
      'update users set last_comment_id=$2 where id=$1', [user_id, cmt_id])
    cmt_id
  end
end

def update_comment_count(page_id)
  db.transaction do
    count = db.query('select count(*) from comments where page_id=$1', [page_id]).getvalue(0,0)
    db.query('update pages set comment_count=$2 where id=$1', [page_id, count])
  end
end

# to run add_comment and update_comment_count within the same transaction
db.transaction do
  add_comment(user_id, some_text)
  update_comment_count(page_id)
end

Yield Parameters:

  • client (self)

Returns:

  • (Object)

    result of the block

Raises:

  • (ArgumentError)

See Also:



1067
1068
1069
1070
1071
1072
1073
1074
1075
1076
1077
1078
1079
1080
1081
1082
1083
1084
1085
1086
1087
1088
1089
1090
1091
1092
1093
1094
1095
1096
1097
1098
1099
1100
1101
1102
1103
1104
1105
1106
1107
1108
1109
1110
1111
1112
1113
1114
1115
1116
1117
1118
1119
1120
1121
1122
1123
1124
# File 'lib/pg/em.rb', line 1067

def transaction
  raise ArgumentError, 'Must supply block for PG::EM::Client#transaction' unless block_given?
  tcount = @client_tran_count.to_i

  case transaction_status
  when PQTRANS_IDLE
    # there is no transaction yet, so let's begin
    exec(TRAN_BEGIN_QUERY)
    # reset transaction count in case user code rolled it back before
    tcount = 0 if tcount != 0
  when PQTRANS_INTRANS
    # transaction in progress, leave it be
  else
    # transaction failed, is in unknown state or command is active
    # in any case calling begin will raise server transaction error
    exec(TRAN_BEGIN_QUERY) # raises PG::InFailedSqlTransaction
  end
  # memoize nested count
  @client_tran_count = tcount + 1
  begin

    result = yield self

  rescue
    # error was raised
    case transaction_status
    when PQTRANS_INTRANS, PQTRANS_INERROR
      # do not rollback if transaction was rolled back before
      # or is in unknown state, which means connection reset is needed
      # and rollback only from the outermost transaction block
      exec(TRAN_ROLLBACK_QUERY) if tcount.zero?
    end
    # raise again
    raise
  else
    # we are good (but not out of woods yet)
    case transaction_status
    when PQTRANS_INTRANS
      # commit only from the outermost transaction block
      exec(TRAN_COMMIT_QUERY) if tcount.zero?
    when PQTRANS_INERROR
      # no ruby error was raised (or an error was rescued in code block)
      # but there was an sql error anyway
      # so rollback after the outermost block
      exec(TRAN_ROLLBACK_QUERY) if tcount.zero?
    when PQTRANS_IDLE
      # the code block has terminated the transaction on its own
      # so just reset the counter
      tcount = 0
    else
      # something isn't right, so provoke an error just in case
      exec(TRAN_ROLLBACK_QUERY) if tcount.zero?
    end
    result
  ensure
    @client_tran_count = tcount
  end
end

#wait_for_notify(timeout = nil) {|name, pid, payload| ... } ⇒ String|nil Also known as: notifies_wait

Blocks while waiting for notification(s), or until the optional timeout is reached, whichever comes first. Returns nil if timeout is reached, the name of the NOTIFY event otherwise.

If EventMachine reactor is running and the current fiber isn’t the root fiber this method performs command asynchronously yielding current fiber. Other fibers can process while the current one is waiting for notifications.

Otherwise performs a blocking call to a parent method.

Yield Parameters:

  • name (String)

    the name of the NOTIFY event

  • pid (Number)

    the generating pid

  • payload (String)

    the optional payload

Returns:

  • (String|nil)

Raises:

  • (PG::Error)

See Also:



961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
# File 'lib/pg/em.rb', line 961

def wait_for_notify(timeout = nil)
  if ::EM.reactor_running? && !(f = Fiber.current).equal?(ROOT_FIBER)
    unless notify_hash = notifies
      if (notify_hash = fiber_sync wait_for_notify_defer(timeout), f).is_a?(::Exception)
        raise notify_hash
      end
    end
    if notify_hash
      if block_given?
        yield notify_hash.values_at(:relname, :be_pid, :extra)
      end
      notify_hash[:relname]
    end
  else
    super
  end
end

#wait_for_notify_defer(timeout = nil) {|notification| ... } ⇒ FeaturedDeferrable Also known as: notifies_wait_defer

Asynchronously waits for notification or until the optional timeout is reached, whichever comes first. timeout is measured in seconds and can be fractional. Returns immediately with a Deferrable.

Pass the block to the returned deferrable’s callback to obtain notification hash. In case of connection error errback hook is called with an error object. If the timeout is reached nil is passed to deferrable’s callback. If the block is provided it’s bound to both the callback and errback hooks of the returned deferrable. If another call is made to this method before the notification is received (or before reaching timeout) the previous deferrable’s errback will be called with nil argument.

Yield Parameters:

  • notification (Hash|nil|Error)

    notification hash or a PG::Error instance on error or nil when timeout is reached or canceled.

Returns:

See Also:



708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
# File 'lib/pg/em.rb', line 708

def wait_for_notify_defer(timeout = nil, &blk)
  df = FeaturedDeferrable.new
  begin
    check_async_command_aborted!
    if status == CONNECTION_OK
      setup_emio_watcher.watch_notify(df, timeout)
    else
      raise_error ConnectionBad
    end
  rescue Error => e
    ::EM.next_tick { async_autoreconnect!(df, e) }
  rescue Exception => e
    ::EM.next_tick { df.fail(e) }
  end
  df.completion(&blk) if block_given?
  df
end