Class: Sequel::ConnectionPool

Inherits:
Object
  • Object
show all
Defined in:
lib/sequel/lib/sequel/connection_pool.rb

Overview

A ConnectionPool manages access to database connections by keeping multiple connections and giving threads exclusive access to each connection.

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(opts = {}, &block) ⇒ ConnectionPool

Constructs a new pool with a maximum size. If a block is supplied, it is used to create new connections as they are needed.

pool = ConnectionPool.new(:max_connections=>10) {MyConnection.new(opts)}

The connection creation proc can be changed at any time by assigning a Proc to pool#connection_proc.

pool = ConnectionPool.new(:max_connections=>10)
pool.connection_proc = proc {MyConnection.new(opts)}

The connection pool takes the following options:

  • :disconnection_proc - The proc called when removing connections from the pool.

  • :max_connections - The maximum number of connections the connection pool will open (default 4)

  • :pool_convert_exceptions - Whether to convert non-StandardError based exceptions to RuntimeError exceptions (default true)

  • :pool_sleep_time - The amount of time to sleep before attempting to acquire a connection again (default 0.001)

  • :pool_timeout - The amount of seconds to wait to acquire a connection before raising a PoolTimeoutError (default 5)

  • :servers - A hash of servers to use. Keys should be symbols. If not present, will use a single :default server. The server name symbol will be passed to the connection_proc.

Raises:


43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
# File 'lib/sequel/lib/sequel/connection_pool.rb', line 43

def initialize(opts = {}, &block)
  @max_size = Integer(opts[:max_connections] || 4)
  raise(Sequel::Error, ':max_connections must be positive') if @max_size < 1
  @mutex = Mutex.new
  @connection_proc = block
  @disconnection_proc = opts[:disconnection_proc]
  @servers = [:default]
  @servers += opts[:servers].keys - @servers if opts[:servers] 
  @available_connections = Hash.new{|h,k| h[:default]}
  @allocated = Hash.new{|h,k| h[:default]}
  @servers.each do |s|
    @available_connections[s] = []
    @allocated[s] = {}
  end
  @timeout = Integer(opts[:pool_timeout] || 5)
  @sleep_time = Float(opts[:pool_sleep_time] || 0.001)
  @convert_exceptions = opts.include?(:pool_convert_exceptions) ? opts[:pool_convert_exceptions] : true
end

Instance Attribute Details

#connection_procObject

The proc used to create a new database connection.


6
7
8
# File 'lib/sequel/lib/sequel/connection_pool.rb', line 6

def connection_proc
  @connection_proc
end

#disconnection_procObject

The proc used to disconnect a database connection.


9
10
11
# File 'lib/sequel/lib/sequel/connection_pool.rb', line 9

def disconnection_proc
  @disconnection_proc
end

#max_sizeObject (readonly)

The maximum number of connections.


12
13
14
# File 'lib/sequel/lib/sequel/connection_pool.rb', line 12

def max_size
  @max_size
end

#mutexObject (readonly)

The mutex that protects access to the other internal vairables. You must use this if you want to manipulate the variables safely.


16
17
18
# File 'lib/sequel/lib/sequel/connection_pool.rb', line 16

def mutex
  @mutex
end

Instance Method Details

#allocated(server = :default) ⇒ Object

A hash of connections currently being used for the given server, key is the Thread, value is the connection.


64
65
66
# File 'lib/sequel/lib/sequel/connection_pool.rb', line 64

def allocated(server=:default)
  @allocated[server]
end

#available_connections(server = :default) ⇒ Object

An array of connections opened but not currently used, for the given server.


70
71
72
# File 'lib/sequel/lib/sequel/connection_pool.rb', line 70

def available_connections(server=:default)
  @available_connections[server]
end

#created_count(server = :default) ⇒ Object Also known as: size

The total number of connections opened for the given server, should be equal to available_connections.length + allocated.length


76
77
78
# File 'lib/sequel/lib/sequel/connection_pool.rb', line 76

def created_count(server=:default)
  @allocated[server].length + @available_connections[server].length
end

#disconnect(&block) ⇒ Object

Removes all connection currently available on all servers, optionally yielding each connection to the given block. This method has the effect of disconnecting from the database, assuming that no connections are currently being used. Once a connection is requested using #hold, the connection pool creates new connections to the database.


131
132
133
134
135
136
137
138
139
# File 'lib/sequel/lib/sequel/connection_pool.rb', line 131

def disconnect(&block)
  block ||= @disconnection_proc
  @mutex.synchronize do
    @available_connections.each do |server, conns|
      conns.each{|c| block.call(c)} if block
      conns.clear
    end
  end
end

#hold(server = :default) ⇒ Object

Chooses the first available connection to the given server, or if none are available, creates a new connection. Passes the connection to the supplied block:

pool.hold {|conn| conn.execute('DROP TABLE posts')}

Pool#hold is re-entrant, meaning it can be called recursively in the same thread without blocking.

If no connection is immediately available and the pool is already using the maximum number of connections, Pool#hold will block until a connection is available or the timeout expires. If the timeout expires before a connection can be acquired, a Sequel::PoolTimeout is raised.


95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
# File 'lib/sequel/lib/sequel/connection_pool.rb', line 95

def hold(server=:default)
  begin
    t = Thread.current
    if conn = owned_connection(t, server)
      return yield(conn)
    end
    begin
      unless conn = acquire(t, server)
        time = Time.new
        timeout = time + @timeout
        sleep_time = @sleep_time
        sleep sleep_time
        until conn = acquire(t, server)
          raise(::Sequel::PoolTimeout) if Time.new > timeout
          sleep sleep_time
        end
      end
      yield conn
    rescue Sequel::DatabaseDisconnectError => dde
      remove(t, conn, server) if conn
      raise
    ensure
      @mutex.synchronize{release(t, server)} if conn && !dde
    end
  rescue StandardError 
    raise
  rescue Exception => e
    raise(@convert_exceptions ? RuntimeError.new(e.message) : e)
  end
end