Class: Sequel::Mysql2::Synchrony::ShardedConnectionPool

Inherits:
ConnectionPool
  • Object
show all
Defined in:
lib/my-sequel-synchrony/connection_pool.rb

Overview

The sharded version of the em-synchrony connection pool is automatically used if a :servers option is passed to Sequel.connect(). It keeps a mapping from server ids to non-sharded connection pools, and delegates calls to them as needed.

Instance Method Summary collapse

Constructor Details

#initialize(opts = {}, &block) ⇒ ShardedConnectionPool

The following additional options are respected:

  • :servers - A hash of servers to use. Keys should be symbols. If not present, will use a single :default server. The server name symbol will be passed to the connection_proc. The values in the hash should be per-server specific options (such as different :hostname or :database)

  • :servers_hash - The base hash to use for the servers. By default, Sequel uses Hash.new(:default). You can use a hash with a default proc that raises an error if you want to catch all cases where a nonexistent server is used.



204
205
206
207
208
209
210
211
212
213
214
# File 'lib/my-sequel-synchrony/connection_pool.rb', line 204

def initialize(opts = {}, &block)
  # NOTE: @initialize_opts is passed to the internal non-sharded connection pools when they are created.
  @initialize_opts = opts.dup
  @initialize_opts.delete(:servers)
  @initialize_opts.delete(:servers_hash)
  @initialize_block = block

  @server_ids = opts[:servers_hash] || Hash.new(:default)
  @server_pools = {}
  add_servers([:default] + (opts[:servers] ? opts[:servers].keys : []))
end

Instance Method Details

#add_servers(servers) ⇒ Object

Adds new servers to the connection pool. Primarily used in conjunction with master/slave or shard configurations. Allows for dynamic expansion of the potential slaves/shards at runtime. servers argument should be an array of symbols.



219
220
221
222
223
224
225
226
227
228
# File 'lib/my-sequel-synchrony/connection_pool.rb', line 219

def add_servers(servers)
  servers.each do |server|
    next if @server_ids.has_key?(server)

    @server_ids[server] = server
    @server_pools[server] = ::Sequel::Mysql2::Synchrony::ConnectionPool.new(@initialize_opts) do
      @initialize_block.call(server) if @initialize_block
    end
  end
end

#disconnect(opts = {}, &block) ⇒ Object

Removes all connections currently available on all servers, optionally yielding each connection to the given block. This method has the effect of disconnecting from the database, assuming that no connections are currently being used. If connections are being used, they are scheduled to be disconnected as soon as they are returned to the pool.

Once a connection is requested using #hold, the connection pool creates new connections to the database. Options:

  • :server - Should be a symbol specifing the server to disconnect from, or an array of symbols to specify multiple servers. If not specified, then all servers are disconnected.



262
263
264
265
266
267
268
# File 'lib/my-sequel-synchrony/connection_pool.rb', line 262

def disconnect(opts = {}, &block)
  servers_to_disconnect = @server_pools.keys
  servers_to_disconnect &= Array(opts[:servers]) if opts[:servers]
  servers_to_disconnect.each do |server_id|
    @server_pools[server_id].disconnect(opts, &block)
  end
end

#hold(server = :default, &block) ⇒ Object

Chooses the first available connection to the given server, or if none are available, creates a new connection. Passes the connection to the supplied block:

pool.hold {|conn| conn.execute('DROP TABLE posts')}

Pool#hold is re-entrant, meaning it can be called recursively in the same thread without blocking.

If no connection is immediately available and the pool is already using the maximum number of connections, Pool#hold will block until a connection is available or the timeout expires. If the timeout expires before a connection can be acquired, a Sequel::PoolTimeout is raised.



284
285
286
# File 'lib/my-sequel-synchrony/connection_pool.rb', line 284

def hold(server = :default, &block)
  @server_pools[@server_ids[server]].hold(nil, &block)
end

#remove_servers(servers) ⇒ Object

Remove servers from the connection pool. Primarily used in conjunction with master/slave or shard configurations. Similar to disconnecting from all given servers, except that after it is used, future requests for the server will use the :default server instead.

Raises:

  • (Sequel::Error)


234
235
236
237
238
239
240
241
242
# File 'lib/my-sequel-synchrony/connection_pool.rb', line 234

def remove_servers(servers)
  raise(Sequel::Error, "cannot remove default server") if servers.include?(:default)
  pools_to_disconnect = []
  servers.each do |server|
    pool = @server_pools.delete(server)
    pools_to_disconnect << pool if pool
  end
  pools_to_disconnect.each { |pool| pool.disconnect }
end

#serversObject

Return an array of symbols for servers in the connection pool.



289
290
291
# File 'lib/my-sequel-synchrony/connection_pool.rb', line 289

def servers
  @server_ids.keys
end

#size(server = :default) ⇒ Object

The total number of connections opened for the given server, should be equal to available_connections.length + allocated.length. Nonexistent servers will return the created count of the default server.



247
248
249
# File 'lib/my-sequel-synchrony/connection_pool.rb', line 247

def size(server = :default)
  @server_pools[@server_ids[server]].size
end