Module: Rage::Ext::ActiveRecord::ConnectionPool
- Defined in:
- lib/rage/ext/active_record/connection_pool.rb
Defined Under Namespace
Classes: BlackHoleList
Instance Method Summary collapse
- #__init_rage_extension ⇒ Object
-
#active_connection? ⇒ Boolean
Returns true if there is an open connection being used for the current fiber.
-
#checkin(conn) ⇒ Object
Check in a database connection back into the pool, indicating that you no longer need this connection.
-
#checkout(_ = nil) ⇒ Object
Check out a database connection from the pool, indicating that you want to use it.
- #clear_reloadable_connections(raise_on_acquisition_timeout = true) ⇒ Object
- #clear_reloadable_connections! ⇒ Object
-
#connected? ⇒ Boolean
Returns true if a connection has already been opened.
-
#connection ⇒ Object
Retrieve the connection associated with the current fiber, or obtain one if necessary.
-
#connections ⇒ Object
Returns an array containing the connections currently in the pool.
-
#discard! ⇒ Object
Discards all connections in the pool (even if they’re currently in use!), along with the pool itself.
- #discarded? ⇒ Boolean
-
#disconnect(raise_on_acquisition_timeout = true, disconnect_attempts = 0) ⇒ Object
Disconnects all connections in the pool, and clears the pool.
-
#disconnect! ⇒ Object
Disconnects all connections in the pool, and clears the pool.
-
#flush(minimum_idle = @__idle_timeout) ⇒ Object
Disconnect all connections that have been idle for at least
minimum_idle
seconds. -
#flush! ⇒ Object
Disconnect all currently idle connections.
- #num_waiting_in_queue ⇒ Object
-
#reap ⇒ Object
Recover lost connections for the pool.
-
#release_connection(owner = Fiber.current) ⇒ Object
Signal that the fiber is finished with the current connection and it can be returned to the pool.
-
#remove(conn) ⇒ Object
Remove a connection from the connection pool.
-
#stat ⇒ Object
Return connection pool’s usage statistic.
-
#with_connection ⇒ Object
Yields a connection from the connection pool to the block.
Instance Method Details
#__init_rage_extension ⇒ Object
34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 |
# File 'lib/rage/ext/active_record/connection_pool.rb', line 34 def __init_rage_extension # a map of fibers that are currently waiting for a # connection in the format of { Fiber => timestamp } @__blocked = {} # a map of fibers that are currently hodling connections # in the format of { Fiber => Connection } @__in_use = {} # a list of all DB connections that are currently idle @__connections = build_new_connections # how long a fiber can wait for a connection to become available @__checkout_timeout = checkout_timeout # how long a connection can be idle for before disconnecting @__idle_timeout = reaper.frequency # how often should we check for fibers that wait for a connection for too long @__timeout_worker_frequency = 0.5 # reject fibers that wait for a connection for more than `@__checkout_timeout` Iodine.run_every((@__timeout_worker_frequency * 1_000).to_i) do if @__blocked.length > 0 current_time = Process.clock_gettime(Process::CLOCK_MONOTONIC) @__blocked.each do |fiber, blocked_since| if (current_time - blocked_since) > @__checkout_timeout @__blocked.delete(fiber) fiber.raise(ActiveRecord::ConnectionTimeoutError, "could not obtain a connection from the pool within #{@__checkout_timeout} seconds; all pooled connections were in use") end end end end # resume blocked fibers once connections become available Iodine.subscribe("ext:ar-connection-released") do if @__blocked.length > 0 && @__connections.length > 0 f, _ = @__blocked.shift f.resume end end # unsubscribe on shutdown Iodine.on_state(:on_finish) do Iodine.unsubscribe("ext:ar-connection-released") end end |
#active_connection? ⇒ Boolean
Returns true if there is an open connection being used for the current fiber.
83 84 85 |
# File 'lib/rage/ext/active_record/connection_pool.rb', line 83 def active_connection? @__in_use[Fiber.current] end |
#checkin(conn) ⇒ Object
Check in a database connection back into the pool, indicating that you no longer need this connection.
234 235 236 237 |
# File 'lib/rage/ext/active_record/connection_pool.rb', line 234 def checkin(conn) fiber = @__in_use.key(conn) release_connection(fiber) end |
#checkout(_ = nil) ⇒ Object
Check out a database connection from the pool, indicating that you want to use it. You should call #checkin when you no longer need this.
229 230 231 |
# File 'lib/rage/ext/active_record/connection_pool.rb', line 229 def checkout(_ = nil) connection end |
#clear_reloadable_connections(raise_on_acquisition_timeout = true) ⇒ Object
247 248 249 |
# File 'lib/rage/ext/active_record/connection_pool.rb', line 247 def clear_reloadable_connections(raise_on_acquisition_timeout = true) disconnect(raise_on_acquisition_timeout) end |
#clear_reloadable_connections! ⇒ Object
251 252 253 |
# File 'lib/rage/ext/active_record/connection_pool.rb', line 251 def clear_reloadable_connections! disconnect(false) end |
#connected? ⇒ Boolean
Returns true if a connection has already been opened.
163 164 165 |
# File 'lib/rage/ext/active_record/connection_pool.rb', line 163 def connected? true end |
#connection ⇒ Object
Retrieve the connection associated with the current fiber, or obtain one if necessary.
88 89 90 91 92 93 94 95 96 |
# File 'lib/rage/ext/active_record/connection_pool.rb', line 88 def connection @__in_use[Fiber.current] ||= @__connections.shift || begin fiber, blocked_since = Fiber.current, Process.clock_gettime(Process::CLOCK_MONOTONIC) @__blocked[fiber] = blocked_since Fiber.yield @__connections.shift end end |
#connections ⇒ Object
Returns an array containing the connections currently in the pool.
158 159 160 |
# File 'lib/rage/ext/active_record/connection_pool.rb', line 158 def connections @__connections.to_a end |
#discard! ⇒ Object
Discards all connections in the pool (even if they’re currently in use!), along with the pool itself. Any further interaction with the pool is undefined.
261 262 263 264 |
# File 'lib/rage/ext/active_record/connection_pool.rb', line 261 def discard! @__discarded = true (@__connections + @__in_use.values).each { |conn| conn.discard! } end |
#discarded? ⇒ Boolean
266 267 268 |
# File 'lib/rage/ext/active_record/connection_pool.rb', line 266 def discarded? !!@__discarded end |
#disconnect(raise_on_acquisition_timeout = true, disconnect_attempts = 0) ⇒ Object
Disconnects all connections in the pool, and clears the pool. Raises ActiveRecord::ExclusiveConnectionTimeoutError
if unable to gain ownership of all connections in the pool within a timeout interval (default duration is checkout_timeout * 2
seconds).
183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 |
# File 'lib/rage/ext/active_record/connection_pool.rb', line 183 def disconnect(raise_on_acquisition_timeout = true, disconnect_attempts = 0) # allow request fibers to release connections, but block from acquiring new ones if disconnect_attempts == 0 @__connections = BlackHoleList.new(@__connections) end # if some connections are in use, we will wait for up to `@__checkout_timeout * 2` seconds if @__in_use.length > 0 && disconnect_attempts <= @__checkout_timeout * 4 Iodine.run_after(500) { disconnect(raise_on_acquisition_timeout, disconnect_attempts + 1) } return end pool_connections = @__connections.to_a # check if there are still some connections in use if @__in_use.length > 0 raise(ActiveRecord::ExclusiveConnectionTimeoutError, "could not obtain ownership of all database connections") if raise_on_acquisition_timeout pool_connections += @__in_use.values @__in_use.clear end # disconnect all connections pool_connections.each do |conn| conn.disconnect! __remove__(conn) end # create a new pool @__connections = build_new_connections # notify blocked fibers that there are new connections available [@__blocked.length, @__connections.length].min.times do Iodine.publish("ext:ar-connection-released", "", Iodine::PubSub::PROCESS) end end |
#disconnect! ⇒ Object
Disconnects all connections in the pool, and clears the pool. The pool first tries to gain ownership of all connections. If unable to do so within a timeout interval (default duration is checkout_timeout * 2
seconds), then the pool is forcefully disconnected without any regard for other connection owning fibers.
223 224 225 |
# File 'lib/rage/ext/active_record/connection_pool.rb', line 223 def disconnect! disconnect(false) end |
#flush(minimum_idle = @__idle_timeout) ⇒ Object
Disconnect all connections that have been idle for at least minimum_idle
seconds. Connections currently checked out, or that were checked in less than minimum_idle
seconds ago, are unaffected.
130 131 132 133 134 135 136 137 138 139 140 141 142 |
# File 'lib/rage/ext/active_record/connection_pool.rb', line 130 def flush(minimum_idle = @__idle_timeout) return if minimum_idle.nil? || @__connections.length == 0 current_time, i = Process.clock_gettime(Process::CLOCK_MONOTONIC), 0 while i < @__connections.length conn = @__connections[i] if conn.__idle_since && current_time - conn.__idle_since >= minimum_idle conn.__idle_since = nil conn.disconnect! end i += 1 end end |
#flush! ⇒ Object
Disconnect all currently idle connections. Connections currently checked out are unaffected.
145 146 147 148 |
# File 'lib/rage/ext/active_record/connection_pool.rb', line 145 def flush! reap flush(-1) end |
#num_waiting_in_queue ⇒ Object
255 256 257 |
# File 'lib/rage/ext/active_record/connection_pool.rb', line 255 def num_waiting_in_queue @__blocked.length end |
#reap ⇒ Object
Recover lost connections for the pool.
110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 |
# File 'lib/rage/ext/active_record/connection_pool.rb', line 110 def reap @__in_use.each do |fiber, conn| unless fiber.alive? if conn.active? conn.reset! release_connection(fiber) else @__in_use.delete(fiber) conn.disconnect! __remove__(conn) @__connections += build_new_connections(1) Iodine.publish("ext:ar-connection-released", "", Iodine::PubSub::PROCESS) if @__blocked.length > 0 end end end end |
#release_connection(owner = Fiber.current) ⇒ Object
Signal that the fiber is finished with the current connection and it can be returned to the pool.
99 100 101 102 103 104 105 106 107 |
# File 'lib/rage/ext/active_record/connection_pool.rb', line 99 def release_connection(owner = Fiber.current) if conn = @__in_use.delete(owner) conn.__idle_since = Process.clock_gettime(Process::CLOCK_MONOTONIC) @__connections << conn Iodine.publish("ext:ar-connection-released", "", Iodine::PubSub::PROCESS) if @__blocked.length > 0 end conn end |
#remove(conn) ⇒ Object
Remove a connection from the connection pool. The connection will remain open and active but will no longer be managed by this pool.
241 242 243 244 245 |
# File 'lib/rage/ext/active_record/connection_pool.rb', line 241 def remove(conn) __remove__(conn) @__in_use.delete_if { |_, c| c == conn } @__connections.delete(conn) end |
#stat ⇒ Object
Return connection pool’s usage statistic.
168 169 170 171 172 173 174 175 176 177 178 |
# File 'lib/rage/ext/active_record/connection_pool.rb', line 168 def stat { size: size, connections: size, busy: @__in_use.count { |fiber, _| fiber.alive? }, dead: @__in_use.count { |fiber, _| !fiber.alive? }, idle: @__connections.length, waiting: @__blocked.length, checkout_timeout: @__checkout_timeout } end |
#with_connection ⇒ Object
Yields a connection from the connection pool to the block.
151 152 153 154 155 |
# File 'lib/rage/ext/active_record/connection_pool.rb', line 151 def with_connection yield connection ensure release_connection end |