Class: Sequel::TimedQueueConnectionPool
- Inherits:
-
ConnectionPool
- Object
- ConnectionPool
- Sequel::TimedQueueConnectionPool
- Defined in:
- lib/sequel/connection_pool/timed_queue.rb
Overview
A connection pool allowing multi-threaded access to a pool of connections, using a timed queue (only available in Ruby 3.2+).
Constant Summary
Constants inherited from ConnectionPool
ConnectionPool::OPTS, ConnectionPool::POOL_CLASS_MAP
Instance Attribute Summary collapse
-
#max_size ⇒ Object
readonly
The maximum number of connections this pool will create.
Attributes inherited from ConnectionPool
#after_connect, #connect_sqls, #db
Instance Method Summary collapse
-
#all_connections ⇒ Object
Yield all of the available connections, and the one currently allocated to this thread.
-
#disconnect(opts = OPTS) ⇒ Object
Removes all connections currently in the pool’s queue.
-
#hold(server = nil) ⇒ Object
Chooses the first available connection, or if none are available, creates a new connection.
-
#initialize(db, opts = OPTS) ⇒ TimedQueueConnectionPool
constructor
- The following additional options are respected: :max_connections
- The maximum number of connections the connection pool will open (default 4) :pool_timeout
-
The amount of seconds to wait to acquire a connection before raising a PoolTimeout (default 5).
- #pool_type ⇒ Object
-
#size ⇒ Object
The total number of connections in the pool.
Methods inherited from ConnectionPool
Methods included from ConnectionPool::ClassMethods
Constructor Details
#initialize(db, opts = OPTS) ⇒ TimedQueueConnectionPool
The following additional options are respected:
- :max_connections
-
The maximum number of connections the connection pool will open (default 4)
- :pool_timeout
-
The amount of seconds to wait to acquire a connection before raising a PoolTimeout (default 5)
18 19 20 21 22 23 24 25 26 27 28 29 |
# File 'lib/sequel/connection_pool/timed_queue.rb', line 18 def initialize(db, opts = OPTS) super @max_size = Integer(opts[:max_connections] || 4) raise(Sequel::Error, ':max_connections must be positive') if @max_size < 1 @mutex = Mutex.new # Size inside array so this still works while the pool is frozen. @size = [0] @allocated = {} @allocated.compare_by_identity @timeout = Float(opts[:pool_timeout] || 5) @queue = Queue.new end |
Instance Attribute Details
#max_size ⇒ Object (readonly)
The maximum number of connections this pool will create.
11 12 13 |
# File 'lib/sequel/connection_pool/timed_queue.rb', line 11 def max_size @max_size end |
Instance Method Details
#all_connections ⇒ Object
Yield all of the available connections, and the one currently allocated to this thread. This will not yield connections currently allocated to other threads, as it is not safe to operate on them.
34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 |
# File 'lib/sequel/connection_pool/timed_queue.rb', line 34 def all_connections hold do |conn| yield conn # Use a hash to record all connections already seen. As soon as we # come across a connection we've already seen, we stop the loop. conns = {} conns.compare_by_identity while true conn = nil begin break unless (conn = @queue.pop(timeout: 0)) && !conns[conn] conns[conn] = true yield conn ensure @queue.push(conn) if conn end end end end |
#disconnect(opts = OPTS) ⇒ Object
Removes all connections currently in the pool’s queue. This method has the effect of disconnecting from the database, assuming that no connections are currently being used.
Once a connection is requested using #hold, the connection pool creates new connections to the database.
61 62 63 64 65 66 67 |
# File 'lib/sequel/connection_pool/timed_queue.rb', line 61 def disconnect(opts=OPTS) while conn = @queue.pop(timeout: 0) disconnect_connection(conn) end fill_queue nil end |
#hold(server = nil) ⇒ Object
Chooses the first available connection, or if none are available, creates a new connection. Passes the connection to the supplied block:
pool.hold {|conn| conn.execute('DROP TABLE posts')}
Pool#hold is re-entrant, meaning it can be called recursively in the same thread without blocking.
If no connection is immediately available and the pool is already using the maximum number of connections, Pool#hold will block until a connection is available or the timeout expires. If the timeout expires before a connection can be acquired, a Sequel::PoolTimeout is raised.
82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 |
# File 'lib/sequel/connection_pool/timed_queue.rb', line 82 def hold(server=nil) t = Sequel.current if conn = owned_connection(t) return yield(conn) end begin conn = acquire(t) yield conn rescue Sequel::DatabaseDisconnectError, *@error_classes => e if disconnect_error?(e) oconn = conn conn = nil disconnect_connection(oconn) if oconn sync{@allocated.delete(t)} fill_queue end raise ensure release(t) if conn end end |
#pool_type ⇒ Object
105 106 107 |
# File 'lib/sequel/connection_pool/timed_queue.rb', line 105 def pool_type :timed_queue end |
#size ⇒ Object
The total number of connections in the pool.
110 111 112 |
# File 'lib/sequel/connection_pool/timed_queue.rb', line 110 def size sync{@size[0]} end |