Class: Sequel::ShardedTimedQueueConnectionPool
- Inherits:
-
ConnectionPool
- Object
- ConnectionPool
- Sequel::ShardedTimedQueueConnectionPool
- Defined in:
- lib/sequel/connection_pool/sharded_timed_queue.rb
Overview
A connection pool allowing multi-threaded access to a sharded pool of connections, using a timed queue (only available in Ruby 3.2+).
Constant Summary
Constants inherited from ConnectionPool
ConnectionPool::OPTS, ConnectionPool::POOL_CLASS_MAP
Instance Attribute Summary collapse
-
#max_size ⇒ Object
readonly
The maximum number of connections this pool will create per shard.
Attributes inherited from ConnectionPool
#after_connect, #connect_sqls, #db
Instance Method Summary collapse
-
#add_servers(servers) ⇒ Object
Adds new servers to the connection pool.
-
#all_connections ⇒ Object
Yield all of the available connections, and the one currently allocated to this thread (if one is allocated).
-
#disconnect(opts = OPTS) ⇒ Object
Removes all connections currently in the pool’s queue.
-
#hold(server = :default) ⇒ Object
Chooses the first available connection for the given server, or if none are available, creates a new connection.
-
#initialize(db, opts = OPTS) ⇒ ShardedTimedQueueConnectionPool
constructor
- The following additional options are respected: :max_connections
- The maximum number of connections the connection pool will open (default 4) :pool_timeout
- The amount of seconds to wait to acquire a connection before raising a PoolTimeout (default 5) :servers
-
A hash of servers to use.
- #pool_type ⇒ Object
-
#remove_servers(servers) ⇒ Object
Remove servers from the connection pool.
-
#servers ⇒ Object
Return an array of symbols for servers in the connection pool.
-
#size(server = :default) ⇒ Object
The total number of connections in the pool.
Methods included from ConnectionPool::ClassMethods
Constructor Details
#initialize(db, opts = OPTS) ⇒ ShardedTimedQueueConnectionPool
The following additional options are respected:
- :max_connections
-
The maximum number of connections the connection pool will open (default 4)
- :pool_timeout
-
The amount of seconds to wait to acquire a connection before raising a PoolTimeout (default 5)
- :servers
-
A hash of servers to use. Keys should be symbols. If not present, will use a single :default server.
- :servers_hash
-
The base hash to use for the servers. By default, Sequel uses Hash.new(:default). You can use a hash with a default proc that raises an error if you want to catch all cases where a nonexistent server is used.
24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 |
# File 'lib/sequel/connection_pool/sharded_timed_queue.rb', line 24 def initialize(db, opts = OPTS) super @max_size = Integer(opts[:max_connections] || 4) raise(Sequel::Error, ':max_connections must be positive') if @max_size < 1 @mutex = Mutex.new @timeout = Float(opts[:pool_timeout] || 5) @allocated = {} @sizes = {} @queues = {} @servers = opts.fetch(:servers_hash, Hash.new(:default)) add_servers([:default]) add_servers(opts[:servers].keys) if opts[:servers] end |
Instance Attribute Details
#max_size ⇒ Object (readonly)
The maximum number of connections this pool will create per shard.
11 12 13 |
# File 'lib/sequel/connection_pool/sharded_timed_queue.rb', line 11 def max_size @max_size end |
Instance Method Details
#add_servers(servers) ⇒ Object
Adds new servers to the connection pool. Allows for dynamic expansion of the potential replicas/shards at runtime. servers
argument should be an array of symbols.
43 44 45 46 47 48 49 50 51 52 53 54 55 |
# File 'lib/sequel/connection_pool/sharded_timed_queue.rb', line 43 def add_servers(servers) sync do servers.each do |server| next if @servers.has_key?(server) @servers[server] = server @sizes[server] = 0 @queues[server] = Queue.new (@allocated[server] = {}).compare_by_identity end end nil end |
#all_connections ⇒ Object
Yield all of the available connections, and the one currently allocated to this thread (if one is allocated). This will not yield connections currently allocated to other threads, as it is not safe to operate on them.
60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 |
# File 'lib/sequel/connection_pool/sharded_timed_queue.rb', line 60 def all_connections thread = Sequel.current sync{@queues.to_a}.each do |server, queue| if conn = owned_connection(thread, server) yield conn end # Use a hash to record all connections already seen. As soon as we # come across a connection we've already seen, we stop the loop. conns = {} conns.compare_by_identity while true conn = nil begin break unless (conn = queue.pop(timeout: 0)) && !conns[conn] conns[conn] = true yield conn ensure queue.push(conn) if conn end end end nil end |
#disconnect(opts = OPTS) ⇒ Object
Removes all connections currently in the pool’s queue. This method has the effect of disconnecting from the database, assuming that no connections are currently being used.
Once a connection is requested using #hold, the connection pool creates new connections to the database.
If the :server option is provided, it should be a symbol or array of symbols, and then the method will only disconnect connectsion from those specified shards.
95 96 97 98 99 100 101 102 103 104 |
# File 'lib/sequel/connection_pool/sharded_timed_queue.rb', line 95 def disconnect(opts=OPTS) (opts[:server] ? Array(opts[:server]) : sync{@servers.keys}).each do |server| raise Sequel::Error, "invalid server" unless queue = sync{@queues[server]} while conn = queue.pop(timeout: 0) disconnect_pool_connection(conn, server) end fill_queue(server) end nil end |
#hold(server = :default) ⇒ Object
Chooses the first available connection for the given server, or if none are available, creates a new connection. Passes the connection to the supplied block:
pool.hold(:server1) {|conn| conn.execute('DROP TABLE posts')}
Pool#hold is re-entrant, meaning it can be called recursively in the same thread without blocking.
If no connection is immediately available and the pool is already using the maximum number of connections, Pool#hold will block until a connection is available or the timeout expires. If the timeout expires before a connection can be acquired, a Sequel::PoolTimeout is raised.
119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 |
# File 'lib/sequel/connection_pool/sharded_timed_queue.rb', line 119 def hold(server=:default) server = pick_server(server) t = Sequel.current if conn = owned_connection(t, server) return yield(conn) end begin conn = acquire(t, server) yield conn rescue Sequel::DatabaseDisconnectError, *@error_classes => e if disconnect_error?(e) oconn = conn conn = nil disconnect_pool_connection(oconn, server) if oconn sync{@allocated[server].delete(t)} fill_queue(server) end raise ensure release(t, conn, server) if conn end end |
#pool_type ⇒ Object
190 191 192 |
# File 'lib/sequel/connection_pool/sharded_timed_queue.rb', line 190 def pool_type :sharded_timed_queue end |
#remove_servers(servers) ⇒ Object
Remove servers from the connection pool. Similar to disconnecting from all given servers, except that after it is used, future requests for the servers will use the :default server instead.
Note that an error will be raised if there are any connections currently checked out for the given servers.
154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 |
# File 'lib/sequel/connection_pool/sharded_timed_queue.rb', line 154 def remove_servers(servers) conns = [] raise(Sequel::Error, "cannot remove default server") if servers.include?(:default) sync do servers.each do |server| next unless @servers.has_key?(server) queue = @queues[server] while conn = queue.pop(timeout: 0) @sizes[server] -= 1 conns << conn end unless @sizes[server] == 0 raise Sequel::Error, "cannot remove server #{server} as it has allocated connections" end @servers.delete(server) @sizes.delete(server) @queues.delete(server) @allocated.delete(server) end end nil ensure disconnect_connections(conns) end |
#servers ⇒ Object
Return an array of symbols for servers in the connection pool.
186 187 188 |
# File 'lib/sequel/connection_pool/sharded_timed_queue.rb', line 186 def servers sync{@servers.keys} end |
#size(server = :default) ⇒ Object
The total number of connections in the pool. Using a non-existant server will return nil.
144 145 146 |
# File 'lib/sequel/connection_pool/sharded_timed_queue.rb', line 144 def size(server=:default) sync{@sizes[server]} end |