Class: PG::EM::ConnectionPool

Inherits:
Object
  • Object
show all
Defined in:
lib/pg/em/connection_pool.rb

Overview

Connection pool for PG::EM::Client

Author

Rafal Michalski

The ConnectionPool allocates new connections asynchronously when there are no free connections left up to the #max_size number.

If PG::EM::Client#async_autoreconnect option is not set or the re-connect fails the failed connection is dropped from the pool.

The list of Client command methods that are available in ConnectionPool:

Fiber synchronized methods:

The asynchronous command methods:

The pool will only allow for #max_size commands (both deferred and fiber synchronized) to be performed concurrently. The pending requests will be queued and executed when connections become available.

Please keep in mind, that the above methods may send commands to different clients from the pool each time they are called. You can’t assume anything about which connection is acquired even if the #max_size of the pool is set to one. This is because no connection will be shared between two concurrent requests and the connections maight occasionally fail and they will be dropped from the pool.

This prevents the ‘*_defer` commands to execute transactions.

For transactions use #transaction and fiber synchronized methods.

Examples:

Basic usage

pg = PG::EM::ConnectionPool.new size: 10, dbname: 'foo'
res = pg.query 'select * from bar'

Defined Under Namespace

Classes: DeferredOptions

Constant Summary collapse

DEFAULT_SIZE =
4

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(options = {}) {|pg, is_async, is_reset| ... } ⇒ ConnectionPool

Creates and initializes a new connection pool.

The connection pool allocates its first connection upon initialization unless lazy: true option is given.

Pass PG::EM::Client options together with ConnectionPool options:

  • :size = 4 - the maximum number of concurrent connections

  • :lazy = false - should lazy allocate first connection

  • :connection_class = PG::EM::Client

For convenience the given block will be set as the on_connect option.

Yield Parameters:

  • pg (Client)

    connected client instance on each newly created connection

  • is_async (Boolean)

    always true in a connection pool context

  • is_reset (Boolean)

    always false unless async_autoreconnect options is true and was actually re-connecting

Raises:

  • (PG::Error)
  • (ArgumentError)

See Also:



94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
# File 'lib/pg/em/connection_pool.rb', line 94

def initialize(options = {}, &on_connect)
  @available = []
  @pending = []
  @allocated = {}
  @max_size = DEFAULT_SIZE
  @connection_class = Client

  if block_given?
    options = {on_connect: on_connect}.merge(options)
  end

  lazy = false
  @options = options.reject do |key, value|
    case key.to_sym
    when :size, :max_size
      @max_size = value.to_i
      true
    when :connection_class
      @connection_class = value
      true
    when :lazy
      lazy = value
      true
    end
  end

  raise ArgumentError, "#{self.class}.new: pool size must be >= 1" if @max_size < 1

  # allocate first connection, unless we are lazy
  hold unless lazy
end

Dynamic Method Handling

This class handles dynamic methods through the method_missing method

#method_missing(*a, &b) ⇒ Object



341
342
343
# File 'lib/pg/em/connection_pool.rb', line 341

def method_missing(*a, &b)
  hold { |c| c.__send__(*a, &b) }
end

Instance Attribute Details

#allocatedObject (readonly)

Returns the value of attribute allocated.



68
69
70
# File 'lib/pg/em/connection_pool.rb', line 68

def allocated
  @allocated
end

#async_autoreconnectBoolean

Set PG::EM::Client#async_autoreconnect on all present and future connections in this pool or read value from options

Returns:

  • (Boolean)

    asynchronous auto re-connect status



204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
# File 'lib/pg/em/connection_pool.rb', line 204

%w[connect_timeout
   query_timeout
   async_autoreconnect
   on_connect
   on_autoreconnect].each do |name|
  class_eval <<-EOD, __FILE__, __LINE__
    def #{name}=(value)
      @options[:#{name}] = value
      b = proc { |c| c.#{name} = value }
      @available.each(&b)
      @allocated.each_value(&b)
    end
  EOD
  if name.start_with?('on_')
    class_eval <<-EOD, __FILE__, __LINE__
      def #{name}(&hook)
        if block_given?
          self.#{name} = hook
        else
          @options[:#{name}] || @options['#{name}']
        end
      end
    EOD
  else
    class_eval <<-EOD, __FILE__, __LINE__
      def #{name}
        @options[:#{name}] || @options['#{name}']
      end
    EOD
  end
  DeferredOptions.class_eval <<-EOD, __FILE__, __LINE__
    def #{name}=(value)
      self[:#{name}=] = value
    end
  EOD
end

#availableObject (readonly)

Returns the value of attribute available.



68
69
70
# File 'lib/pg/em/connection_pool.rb', line 68

def available
  @available
end

#connect_timeoutFloat

Set PG::EM::Client#connect_timeout on all present and future connections in this pool or read value from options

Returns:

  • (Float)

    connection timeout in seconds



204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
# File 'lib/pg/em/connection_pool.rb', line 204

%w[connect_timeout
   query_timeout
   async_autoreconnect
   on_connect
   on_autoreconnect].each do |name|
  class_eval <<-EOD, __FILE__, __LINE__
    def #{name}=(value)
      @options[:#{name}] = value
      b = proc { |c| c.#{name} = value }
      @available.each(&b)
      @allocated.each_value(&b)
    end
  EOD
  if name.start_with?('on_')
    class_eval <<-EOD, __FILE__, __LINE__
      def #{name}(&hook)
        if block_given?
          self.#{name} = hook
        else
          @options[:#{name}] || @options['#{name}']
        end
      end
    EOD
  else
    class_eval <<-EOD, __FILE__, __LINE__
      def #{name}
        @options[:#{name}] || @options['#{name}']
      end
    EOD
  end
  DeferredOptions.class_eval <<-EOD, __FILE__, __LINE__
    def #{name}=(value)
      self[:#{name}=] = value
    end
  EOD
end

#max_sizeInteger (readonly)

Maximum number of connections in the connection pool

Returns:

  • (Integer)


66
67
68
# File 'lib/pg/em/connection_pool.rb', line 66

def max_size
  @max_size
end

#on_autoreconnectProc<Client, Error>

Set PG::EM::Client#on_autoreconnect on all present and future connections in this pool or read value from options

Returns:

  • (Proc<Client, Error>)

    auto re-connect hook



204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
# File 'lib/pg/em/connection_pool.rb', line 204

%w[connect_timeout
   query_timeout
   async_autoreconnect
   on_connect
   on_autoreconnect].each do |name|
  class_eval <<-EOD, __FILE__, __LINE__
    def #{name}=(value)
      @options[:#{name}] = value
      b = proc { |c| c.#{name} = value }
      @available.each(&b)
      @allocated.each_value(&b)
    end
  EOD
  if name.start_with?('on_')
    class_eval <<-EOD, __FILE__, __LINE__
      def #{name}(&hook)
        if block_given?
          self.#{name} = hook
        else
          @options[:#{name}] || @options['#{name}']
        end
      end
    EOD
  else
    class_eval <<-EOD, __FILE__, __LINE__
      def #{name}
        @options[:#{name}] || @options['#{name}']
      end
    EOD
  end
  DeferredOptions.class_eval <<-EOD, __FILE__, __LINE__
    def #{name}=(value)
      self[:#{name}=] = value
    end
  EOD
end

#on_connectProc<Client,is_async,is_reset>

Set PG::EM::Client#on_connect on all present and future connections in this pool or read value from options

Returns:

  • (Proc<Client,is_async,is_reset>)

    connect hook



204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
# File 'lib/pg/em/connection_pool.rb', line 204

%w[connect_timeout
   query_timeout
   async_autoreconnect
   on_connect
   on_autoreconnect].each do |name|
  class_eval <<-EOD, __FILE__, __LINE__
    def #{name}=(value)
      @options[:#{name}] = value
      b = proc { |c| c.#{name} = value }
      @available.each(&b)
      @allocated.each_value(&b)
    end
  EOD
  if name.start_with?('on_')
    class_eval <<-EOD, __FILE__, __LINE__
      def #{name}(&hook)
        if block_given?
          self.#{name} = hook
        else
          @options[:#{name}] || @options['#{name}']
        end
      end
    EOD
  else
    class_eval <<-EOD, __FILE__, __LINE__
      def #{name}
        @options[:#{name}] || @options['#{name}']
      end
    EOD
  end
  DeferredOptions.class_eval <<-EOD, __FILE__, __LINE__
    def #{name}=(value)
      self[:#{name}=] = value
    end
  EOD
end

#query_timeoutFloat

Set PG::EM::Client#query_timeout on all present and future connections in this pool or read value from options

Returns:

  • (Float)

    query timeout in seconds



204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
# File 'lib/pg/em/connection_pool.rb', line 204

%w[connect_timeout
   query_timeout
   async_autoreconnect
   on_connect
   on_autoreconnect].each do |name|
  class_eval <<-EOD, __FILE__, __LINE__
    def #{name}=(value)
      @options[:#{name}] = value
      b = proc { |c| c.#{name} = value }
      @available.each(&b)
      @allocated.each_value(&b)
    end
  EOD
  if name.start_with?('on_')
    class_eval <<-EOD, __FILE__, __LINE__
      def #{name}(&hook)
        if block_given?
          self.#{name} = hook
        else
          @options[:#{name}] || @options['#{name}']
        end
      end
    EOD
  else
    class_eval <<-EOD, __FILE__, __LINE__
      def #{name}
        @options[:#{name}] || @options['#{name}']
      end
    EOD
  end
  DeferredOptions.class_eval <<-EOD, __FILE__, __LINE__
    def #{name}=(value)
      self[:#{name}=] = value
    end
  EOD
end

Class Method Details

.connect_defer(options = {}) {|pg| ... } ⇒ FeaturedDeferrable Also known as: async_connect

Creates and initializes new connection pool.

Attempts to establish the first connection asynchronously.

Use the returned deferrable’s callback hook to obtain newly created PG::EM::ConnectionPool. In case of a connection error errback hook is called with a raised error object as its argument.

If the block is provided it’s bound to both callback and errback hooks of the returned deferrable.

Pass PG::EM::Client options together with ConnectionPool options:

  • :size = 4 - the maximum number of concurrent connections

  • :connection_class = PG::EM::Client

Yield Parameters:

  • pg (Client|PG::Error)

    new and connected client instance on success or a raised PG::Error

Returns:

Raises:

  • (ArgumentError)


148
149
150
151
152
153
# File 'lib/pg/em/connection_pool.rb', line 148

def self.connect_defer(options = {}, &blk)
  pool = new options.merge(lazy: true)
  pool.__send__(:hold_deferred, blk) do
    ::EM::DefaultDeferrable.new.tap { |df| df.succeed pool }
  end
end

Instance Method Details

#finishObject Also known as: close

Finishes all available connections and clears the available pool.

After call to this method the pool is still usable and will try to allocate new client connections on subsequent query commands.



171
172
173
174
175
# File 'lib/pg/em/connection_pool.rb', line 171

def finish
  @available.each { |c| c.finish }
  @available.clear
  self
end

#hold {|pg| ... } ⇒ Object Also known as: execute

Acquires PG::EM::Client connection and passes it to the given block.

The connection is allocated to the current fiber and ensures that any subsequent query from the same fiber will be performed on the connection.

It is possible to nest hold calls from the same fiber, so each time the block will be given the same PG::EM::Client instance. This feature is needed e.g. for nesting transaction calls.

Yield Parameters:



314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
# File 'lib/pg/em/connection_pool.rb', line 314

def hold
  fiber = Fiber.current
  id = fiber.object_id

  if conn = @allocated[id]
    skip_release = true
  else
    conn = acquire(fiber) until conn
  end

  begin
    yield conn if block_given?

  rescue PG::Error
    if conn.status != PG::CONNECTION_OK
      conn.finish unless conn.finished?
      drop_failed(id)
      skip_release = true
    end
    raise
  ensure
    release(id) unless skip_release
  end
end

#respond_to_missing?(m, priv = false) ⇒ Boolean

Returns:

  • (Boolean)


345
346
347
# File 'lib/pg/em/connection_pool.rb', line 345

def respond_to_missing?(m, priv = false)
  hold { |c| c.respond_to?(m, priv) }
end

#sizeInteger

Current number of connections in the connection pool

Returns:

  • (Integer)


163
164
165
# File 'lib/pg/em/connection_pool.rb', line 163

def size
  @available.length + @allocated.length
end

#transaction(&blk) ⇒ Object

Executes a BEGIN at the start of the block and a COMMIT at the end of the block or ROLLBACK if any exception occurs. Calls to transaction may be nested, however without sub-transactions (save points).

Examples:

Transactions

pg = PG::EM::ConnectionPool.new size: 10
pg.transaction do
  pg.exec('insert into animals (family, species) values ($1,$2)',
          [family, species])
  num = pg.query('select count(*) from people where family=$1',
          [family]).get_value(0,0)
  pg.exec('update stats set count = $1 where family=$2',
          [num, family])
end

See Also:



298
299
300
301
302
# File 'lib/pg/em/connection_pool.rb', line 298

def transaction(&blk)
  hold do |pg|
    pg.transaction(&blk)
  end
end