Class: PG::EM::ConnectionPool
- Inherits:
-
Object
- Object
- PG::EM::ConnectionPool
- Defined in:
- lib/pg/em/connection_pool.rb
Overview
Connection pool for PG::EM::Client
- Author
-
Rafal Michalski
The ConnectionPool allocates new connections asynchronously when there are no free connections left up to the #max_size number.
If PG::EM::Client#async_autoreconnect option is not set or the re-connect fails the failed connection is dropped from the pool.
The list of Client command methods that are available in ConnectionPool:
Fiber synchronized methods:
The asynchronous command methods:
The pool will only allow for #max_size commands (both deferred and fiber synchronized) to be performed concurrently. The pending requests will be queued and executed when connections become available.
Please keep in mind, that the above methods may send commands to different clients from the pool each time they are called. You can’t assume anything about which connection is acquired even if the #max_size of the pool is set to one. This is because no connection will be shared between two concurrent requests and the connections maight occasionally fail and they will be dropped from the pool.
This prevents the ‘*_defer` commands to execute transactions.
For transactions use #transaction and fiber synchronized methods.
Defined Under Namespace
Classes: DeferredOptions
Constant Summary collapse
- DEFAULT_SIZE =
4
Instance Attribute Summary collapse
-
#allocated ⇒ Object
readonly
Returns the value of attribute allocated.
-
#async_autoreconnect ⇒ Boolean
Set PG::EM::Client#async_autoreconnect on all present and future connections in this pool or read value from options.
-
#available ⇒ Object
readonly
Returns the value of attribute available.
-
#connect_timeout ⇒ Float
Set PG::EM::Client#connect_timeout on all present and future connections in this pool or read value from options.
-
#max_size ⇒ Integer
readonly
Maximum number of connections in the connection pool.
-
#on_autoreconnect ⇒ Proc<Client, Error>
Set PG::EM::Client#on_autoreconnect on all present and future connections in this pool or read value from options.
-
#on_connect ⇒ Proc<Client,is_async,is_reset>
Set PG::EM::Client#on_connect on all present and future connections in this pool or read value from options.
-
#query_timeout ⇒ Float
Set PG::EM::Client#query_timeout on all present and future connections in this pool or read value from options.
Class Method Summary collapse
-
.connect_defer(options = {}) {|pg| ... } ⇒ FeaturedDeferrable
(also: async_connect)
Creates and initializes new connection pool.
Instance Method Summary collapse
-
#finish ⇒ Object
(also: #close)
Finishes all available connections and clears the available pool.
-
#hold {|pg| ... } ⇒ Object
(also: #execute)
Acquires Client connection and passes it to the given block.
-
#initialize(options = {}) {|pg, is_async, is_reset| ... } ⇒ ConnectionPool
constructor
Creates and initializes a new connection pool.
- #method_missing(*a, &b) ⇒ Object
- #respond_to_missing?(m, priv = false) ⇒ Boolean
-
#size ⇒ Integer
Current number of connections in the connection pool.
-
#transaction(&blk) ⇒ Object
Executes a BEGIN at the start of the block and a COMMIT at the end of the block or ROLLBACK if any exception occurs.
Constructor Details
#initialize(options = {}) {|pg, is_async, is_reset| ... } ⇒ ConnectionPool
Creates and initializes a new connection pool.
The connection pool allocates its first connection upon initialization unless lazy: true option is given.
Pass PG::EM::Client options
together with ConnectionPool options
:
-
:size
=4
- the maximum number of concurrent connections -
:lazy
= false - should lazy allocate first connection -
:connection_class
= PG::EM::Client
For convenience the given block will be set as the on_connect
option.
94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 |
# File 'lib/pg/em/connection_pool.rb', line 94 def initialize( = {}, &on_connect) @available = [] @pending = [] @allocated = {} @max_size = DEFAULT_SIZE @connection_class = Client if block_given? = {on_connect: on_connect}.merge() end lazy = false @options = .reject do |key, value| case key.to_sym when :size, :max_size @max_size = value.to_i true when :connection_class @connection_class = value true when :lazy lazy = value true end end raise ArgumentError, "#{self.class}.new: pool size must be >= 1" if @max_size < 1 # allocate first connection, unless we are lazy hold unless lazy end |
Dynamic Method Handling
This class handles dynamic methods through the method_missing method
#method_missing(*a, &b) ⇒ Object
341 342 343 |
# File 'lib/pg/em/connection_pool.rb', line 341 def method_missing(*a, &b) hold { |c| c.__send__(*a, &b) } end |
Instance Attribute Details
#allocated ⇒ Object (readonly)
Returns the value of attribute allocated.
68 69 70 |
# File 'lib/pg/em/connection_pool.rb', line 68 def allocated @allocated end |
#async_autoreconnect ⇒ Boolean
Set PG::EM::Client#async_autoreconnect on all present and future connections in this pool or read value from options
204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 |
# File 'lib/pg/em/connection_pool.rb', line 204 %w[connect_timeout query_timeout async_autoreconnect on_connect on_autoreconnect].each do |name| class_eval <<-EOD, __FILE__, __LINE__ def #{name}=(value) @options[:#{name}] = value b = proc { |c| c.#{name} = value } @available.each(&b) @allocated.each_value(&b) end EOD if name.start_with?('on_') class_eval <<-EOD, __FILE__, __LINE__ def #{name}(&hook) if block_given? self.#{name} = hook else @options[:#{name}] || @options['#{name}'] end end EOD else class_eval <<-EOD, __FILE__, __LINE__ def #{name} @options[:#{name}] || @options['#{name}'] end EOD end DeferredOptions.class_eval <<-EOD, __FILE__, __LINE__ def #{name}=(value) self[:#{name}=] = value end EOD end |
#available ⇒ Object (readonly)
Returns the value of attribute available.
68 69 70 |
# File 'lib/pg/em/connection_pool.rb', line 68 def available @available end |
#connect_timeout ⇒ Float
Set PG::EM::Client#connect_timeout on all present and future connections in this pool or read value from options
204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 |
# File 'lib/pg/em/connection_pool.rb', line 204 %w[connect_timeout query_timeout async_autoreconnect on_connect on_autoreconnect].each do |name| class_eval <<-EOD, __FILE__, __LINE__ def #{name}=(value) @options[:#{name}] = value b = proc { |c| c.#{name} = value } @available.each(&b) @allocated.each_value(&b) end EOD if name.start_with?('on_') class_eval <<-EOD, __FILE__, __LINE__ def #{name}(&hook) if block_given? self.#{name} = hook else @options[:#{name}] || @options['#{name}'] end end EOD else class_eval <<-EOD, __FILE__, __LINE__ def #{name} @options[:#{name}] || @options['#{name}'] end EOD end DeferredOptions.class_eval <<-EOD, __FILE__, __LINE__ def #{name}=(value) self[:#{name}=] = value end EOD end |
#max_size ⇒ Integer (readonly)
Maximum number of connections in the connection pool
66 67 68 |
# File 'lib/pg/em/connection_pool.rb', line 66 def max_size @max_size end |
#on_autoreconnect ⇒ Proc<Client, Error>
Set PG::EM::Client#on_autoreconnect on all present and future connections in this pool or read value from options
204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 |
# File 'lib/pg/em/connection_pool.rb', line 204 %w[connect_timeout query_timeout async_autoreconnect on_connect on_autoreconnect].each do |name| class_eval <<-EOD, __FILE__, __LINE__ def #{name}=(value) @options[:#{name}] = value b = proc { |c| c.#{name} = value } @available.each(&b) @allocated.each_value(&b) end EOD if name.start_with?('on_') class_eval <<-EOD, __FILE__, __LINE__ def #{name}(&hook) if block_given? self.#{name} = hook else @options[:#{name}] || @options['#{name}'] end end EOD else class_eval <<-EOD, __FILE__, __LINE__ def #{name} @options[:#{name}] || @options['#{name}'] end EOD end DeferredOptions.class_eval <<-EOD, __FILE__, __LINE__ def #{name}=(value) self[:#{name}=] = value end EOD end |
#on_connect ⇒ Proc<Client,is_async,is_reset>
Set PG::EM::Client#on_connect on all present and future connections in this pool or read value from options
204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 |
# File 'lib/pg/em/connection_pool.rb', line 204 %w[connect_timeout query_timeout async_autoreconnect on_connect on_autoreconnect].each do |name| class_eval <<-EOD, __FILE__, __LINE__ def #{name}=(value) @options[:#{name}] = value b = proc { |c| c.#{name} = value } @available.each(&b) @allocated.each_value(&b) end EOD if name.start_with?('on_') class_eval <<-EOD, __FILE__, __LINE__ def #{name}(&hook) if block_given? self.#{name} = hook else @options[:#{name}] || @options['#{name}'] end end EOD else class_eval <<-EOD, __FILE__, __LINE__ def #{name} @options[:#{name}] || @options['#{name}'] end EOD end DeferredOptions.class_eval <<-EOD, __FILE__, __LINE__ def #{name}=(value) self[:#{name}=] = value end EOD end |
#query_timeout ⇒ Float
Set PG::EM::Client#query_timeout on all present and future connections in this pool or read value from options
204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 |
# File 'lib/pg/em/connection_pool.rb', line 204 %w[connect_timeout query_timeout async_autoreconnect on_connect on_autoreconnect].each do |name| class_eval <<-EOD, __FILE__, __LINE__ def #{name}=(value) @options[:#{name}] = value b = proc { |c| c.#{name} = value } @available.each(&b) @allocated.each_value(&b) end EOD if name.start_with?('on_') class_eval <<-EOD, __FILE__, __LINE__ def #{name}(&hook) if block_given? self.#{name} = hook else @options[:#{name}] || @options['#{name}'] end end EOD else class_eval <<-EOD, __FILE__, __LINE__ def #{name} @options[:#{name}] || @options['#{name}'] end EOD end DeferredOptions.class_eval <<-EOD, __FILE__, __LINE__ def #{name}=(value) self[:#{name}=] = value end EOD end |
Class Method Details
.connect_defer(options = {}) {|pg| ... } ⇒ FeaturedDeferrable Also known as: async_connect
Creates and initializes new connection pool.
Attempts to establish the first connection asynchronously.
Use the returned deferrable’s callback
hook to obtain newly created PG::EM::ConnectionPool. In case of a connection error errback
hook is called with a raised error object as its argument.
If the block is provided it’s bound to both callback
and errback
hooks of the returned deferrable.
Pass PG::EM::Client options
together with ConnectionPool options
:
-
:size
=4
- the maximum number of concurrent connections -
:connection_class
= PG::EM::Client
148 149 150 151 152 153 |
# File 'lib/pg/em/connection_pool.rb', line 148 def self.connect_defer( = {}, &blk) pool = new .merge(lazy: true) pool.__send__(:hold_deferred, blk) do ::EM::DefaultDeferrable.new.tap { |df| df.succeed pool } end end |
Instance Method Details
#finish ⇒ Object Also known as: close
Finishes all available connections and clears the available pool.
After call to this method the pool is still usable and will try to allocate new client connections on subsequent query commands.
171 172 173 174 175 |
# File 'lib/pg/em/connection_pool.rb', line 171 def finish @available.each { |c| c.finish } @available.clear self end |
#hold {|pg| ... } ⇒ Object Also known as: execute
Acquires PG::EM::Client connection and passes it to the given block.
The connection is allocated to the current fiber and ensures that any subsequent query from the same fiber will be performed on the connection.
It is possible to nest hold calls from the same fiber, so each time the block will be given the same PG::EM::Client instance. This feature is needed e.g. for nesting transaction calls.
314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 |
# File 'lib/pg/em/connection_pool.rb', line 314 def hold fiber = Fiber.current id = fiber.object_id if conn = @allocated[id] skip_release = true else conn = acquire(fiber) until conn end begin yield conn if block_given? rescue PG::Error if conn.status != PG::CONNECTION_OK conn.finish unless conn.finished? drop_failed(id) skip_release = true end raise ensure release(id) unless skip_release end end |
#respond_to_missing?(m, priv = false) ⇒ Boolean
345 346 347 |
# File 'lib/pg/em/connection_pool.rb', line 345 def respond_to_missing?(m, priv = false) hold { |c| c.respond_to?(m, priv) } end |
#size ⇒ Integer
Current number of connections in the connection pool
163 164 165 |
# File 'lib/pg/em/connection_pool.rb', line 163 def size @available.length + @allocated.length end |
#transaction(&blk) ⇒ Object
Executes a BEGIN at the start of the block and a COMMIT at the end of the block or ROLLBACK if any exception occurs. Calls to transaction may be nested, however without sub-transactions (save points).
298 299 300 301 302 |
# File 'lib/pg/em/connection_pool.rb', line 298 def transaction(&blk) hold do |pg| pg.transaction(&blk) end end |