Module: Rage::Ext::ActiveRecord::ConnectionPool
- Includes:
- ConnectionWithVerify
- Defined in:
- lib/rage/ext/active_record/connection_pool.rb
Defined Under Namespace
Modules: ConnectionWithVerify Classes: BlackHoleList
Instance Method Summary collapse
- #__init_rage_extension ⇒ Object
-
#active_connection? ⇒ Boolean
Returns true if there is an open connection being used for the current fiber.
-
#checkin(conn) ⇒ Object
Check in a database connection back into the pool, indicating that you no longer need this connection.
-
#checkout(_ = nil) ⇒ Object
Check out a database connection from the pool, indicating that you want to use it.
- #clear_reloadable_connections(raise_on_acquisition_timeout = true) ⇒ Object
- #clear_reloadable_connections! ⇒ Object
-
#connected? ⇒ Boolean
Returns true if a connection has already been opened.
-
#connection ⇒ Object
Retrieve the connection associated with the current fiber, or obtain one if necessary.
-
#connections ⇒ Object
Returns an array containing the connections currently in the pool.
-
#discard! ⇒ Object
Discards all connections in the pool (even if they're currently in use!), along with the pool itself.
- #discarded? ⇒ Boolean
-
#disconnect(raise_on_acquisition_timeout = true, disconnect_attempts = 0) ⇒ Object
Disconnects all connections in the pool, and clears the pool.
-
#disconnect! ⇒ Object
Disconnects all connections in the pool, and clears the pool.
-
#flush(minimum_idle = @__idle_timeout) ⇒ Object
Disconnect all connections that have been idle for at least
minimum_idle
seconds. -
#flush! ⇒ Object
Disconnect all currently idle connections.
- #lease_connection ⇒ Object
- #num_waiting_in_queue ⇒ Object
-
#reap ⇒ Object
Recover lost connections for the pool.
-
#release_connection(owner = Fiber.current) ⇒ Object
Signal that the fiber is finished with the current connection and it can be returned to the pool.
-
#remove(conn) ⇒ Object
Remove a connection from the connection pool.
-
#stat ⇒ Object
Return connection pool's usage statistic.
-
#with_connection(_ = nil) ⇒ Object
Yields a connection from the connection pool to the block.
Instance Method Details
#__init_rage_extension ⇒ Object
53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 |
# File 'lib/rage/ext/active_record/connection_pool.rb', line 53 def __init_rage_extension # a map of fibers that are currently waiting for a # connection in the format of { Fiber => timestamp } @__blocked = {} # a map of fibers that are currently hodling connections # in the format of { Fiber => Connection } @__in_use = {} # a list of all DB connections that are currently idle @__connections = build_new_connections # how long a fiber can wait for a connection to become available @__checkout_timeout = checkout_timeout # how long a connection can be idle for before disconnecting @__idle_timeout = respond_to?(:db_config) ? db_config.idle_timeout : @idle_timeout # how often should we check for fibers that wait for a connection for too long @__timeout_worker_frequency = 0.5 # reject fibers that wait for a connection for more than `@__checkout_timeout` Iodine.run_every((@__timeout_worker_frequency * 1_000).to_i) do if @__blocked.length > 0 current_time = Process.clock_gettime(Process::CLOCK_MONOTONIC) @__blocked.each do |fiber, blocked_since| if (current_time - blocked_since) > @__checkout_timeout @__blocked.delete(fiber) fiber.raise(ActiveRecord::ConnectionTimeoutError, "could not obtain a connection from the pool within #{@__checkout_timeout} seconds; all pooled connections were in use") end end end end # monitor connections health if Rage.config.internal.should_manually_restore_ar_connections? Iodine.run_every(1_000) do i = 0 while i < @__connections.length conn = @__connections[i] unless conn.__needs_reconnect needs_reconnect = !conn.active? rescue true if needs_reconnect conn.__needs_reconnect = true conn.disconnect! end end i += 1 end end end @release_connection_channel = "ext:ar-connection-released:#{object_id}" # resume blocked fibers once connections become available Iodine.subscribe(@release_connection_channel) do if @__blocked.length > 0 && @__connections.length > 0 f, _ = @__blocked.shift f.resume end end # unsubscribe on shutdown Iodine.on_state(:on_finish) do Iodine.unsubscribe(@release_connection_channel) end end |
#active_connection? ⇒ Boolean
Returns true if there is an open connection being used for the current fiber.
124 125 126 |
# File 'lib/rage/ext/active_record/connection_pool.rb', line 124 def active_connection? @__in_use[Fiber.current] end |
#checkin(conn) ⇒ Object
Check in a database connection back into the pool, indicating that you no longer need this connection.
292 293 294 295 |
# File 'lib/rage/ext/active_record/connection_pool.rb', line 292 def checkin(conn) fiber = @__in_use.key(conn) release_connection(fiber) end |
#checkout(_ = nil) ⇒ Object
Check out a database connection from the pool, indicating that you want to use it. You should call #checkin when you no longer need this.
283 284 285 |
# File 'lib/rage/ext/active_record/connection_pool.rb', line 283 def checkout(_ = nil) connection end |
#clear_reloadable_connections(raise_on_acquisition_timeout = true) ⇒ Object
305 306 307 |
# File 'lib/rage/ext/active_record/connection_pool.rb', line 305 def clear_reloadable_connections(raise_on_acquisition_timeout = true) disconnect(raise_on_acquisition_timeout) end |
#clear_reloadable_connections! ⇒ Object
309 310 311 |
# File 'lib/rage/ext/active_record/connection_pool.rb', line 309 def clear_reloadable_connections! disconnect(false) end |
#connected? ⇒ Boolean
Returns true if a connection has already been opened.
216 217 218 |
# File 'lib/rage/ext/active_record/connection_pool.rb', line 216 def connected? true end |
#connection ⇒ Object
Retrieve the connection associated with the current fiber, or obtain one if necessary.
129 130 131 132 133 134 135 136 137 |
# File 'lib/rage/ext/active_record/connection_pool.rb', line 129 def connection @__in_use[Fiber.current] ||= @__connections.shift || begin fiber, blocked_since = Fiber.current, Process.clock_gettime(Process::CLOCK_MONOTONIC) @__blocked[fiber] = blocked_since Fiber.yield @__connections.shift end end |
#connections ⇒ Object
Returns an array containing the connections currently in the pool.
211 212 213 |
# File 'lib/rage/ext/active_record/connection_pool.rb', line 211 def connections @__connections.to_a end |
#discard! ⇒ Object
Discards all connections in the pool (even if they're currently in use!), along with the pool itself. Any further interaction with the pool is undefined.
319 320 321 322 |
# File 'lib/rage/ext/active_record/connection_pool.rb', line 319 def discard! @__discarded = true (@__connections + @__in_use.values).each { |conn| conn.discard! } end |
#discarded? ⇒ Boolean
324 325 326 |
# File 'lib/rage/ext/active_record/connection_pool.rb', line 324 def discarded? !!@__discarded end |
#disconnect(raise_on_acquisition_timeout = true, disconnect_attempts = 0) ⇒ Object
Disconnects all connections in the pool, and clears the pool.
Raises ActiveRecord::ExclusiveConnectionTimeoutError
if unable to gain ownership of all
connections in the pool within a timeout interval (default duration is checkout_timeout * 2
seconds).
236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 |
# File 'lib/rage/ext/active_record/connection_pool.rb', line 236 def disconnect(raise_on_acquisition_timeout = true, disconnect_attempts = 0) # allow request fibers to release connections, but block from acquiring new ones if disconnect_attempts == 0 @__connections = BlackHoleList.new(@__connections) end # if some connections are in use, we will wait for up to `@__checkout_timeout * 2` seconds if @__in_use.length > 0 && disconnect_attempts <= @__checkout_timeout * 4 Iodine.run_after(500) { disconnect(raise_on_acquisition_timeout, disconnect_attempts + 1) } return end pool_connections = @__connections.to_a # check if there are still some connections in use if @__in_use.length > 0 raise(ActiveRecord::ExclusiveConnectionTimeoutError, "could not obtain ownership of all database connections") if raise_on_acquisition_timeout pool_connections += @__in_use.values @__in_use.clear end # disconnect all connections pool_connections.each do |conn| conn.disconnect! __remove__(conn) end # create a new pool self.automatic_reconnect = true @__connections = build_new_connections # notify blocked fibers that there are new connections available [@__blocked.length, @__connections.length].min.times do Iodine.publish(@release_connection_channel, "", Iodine::PubSub::PROCESS) end end |
#disconnect! ⇒ Object
Disconnects all connections in the pool, and clears the pool.
The pool first tries to gain ownership of all connections. If unable to
do so within a timeout interval (default duration is checkout_timeout * 2
seconds),
then the pool is forcefully disconnected without any regard for other connection owning fibers.
277 278 279 |
# File 'lib/rage/ext/active_record/connection_pool.rb', line 277 def disconnect! disconnect(false) end |
#flush(minimum_idle = @__idle_timeout) ⇒ Object
Disconnect all connections that have been idle for at least
minimum_idle
seconds. Connections currently checked out, or that were
checked in less than minimum_idle
seconds ago, are unaffected.
178 179 180 181 182 183 184 185 186 187 188 189 190 191 |
# File 'lib/rage/ext/active_record/connection_pool.rb', line 178 def flush(minimum_idle = @__idle_timeout) return if minimum_idle.nil? || @__connections.length == 0 current_time, i = Process.clock_gettime(Process::CLOCK_MONOTONIC), 0 while i < @__connections.length conn = @__connections[i] if conn.__idle_since && current_time - conn.__idle_since >= minimum_idle conn.__idle_since = nil conn.__needs_reconnect = true conn.disconnect! end i += 1 end end |
#flush! ⇒ Object
Disconnect all currently idle connections. Connections currently checked out are unaffected.
194 195 196 197 |
# File 'lib/rage/ext/active_record/connection_pool.rb', line 194 def flush! reap flush(-1) end |
#lease_connection ⇒ Object
287 288 289 |
# File 'lib/rage/ext/active_record/connection_pool.rb', line 287 def lease_connection connection end |
#num_waiting_in_queue ⇒ Object
313 314 315 |
# File 'lib/rage/ext/active_record/connection_pool.rb', line 313 def num_waiting_in_queue @__blocked.length end |
#reap ⇒ Object
Recover lost connections for the pool.
151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 |
# File 'lib/rage/ext/active_record/connection_pool.rb', line 151 def reap crashed_fibers = nil @__in_use.each do |fiber, conn| unless fiber.alive? if conn.active? conn.reset! (crashed_fibers ||= []) << fiber else @__in_use.delete(fiber) conn.disconnect! __remove__(conn) self.automatic_reconnect = true @__connections += build_new_connections(1) Iodine.publish(@release_connection_channel, "", Iodine::PubSub::PROCESS) if @__blocked.length > 0 end end end if crashed_fibers crashed_fibers.each { |fiber| release_connection(fiber) } end end |
#release_connection(owner = Fiber.current) ⇒ Object
Signal that the fiber is finished with the current connection and it can be returned to the pool.
140 141 142 143 144 145 146 147 148 |
# File 'lib/rage/ext/active_record/connection_pool.rb', line 140 def release_connection(owner = Fiber.current) if (conn = @__in_use.delete(owner)) conn.__idle_since = Process.clock_gettime(Process::CLOCK_MONOTONIC) @__connections << conn Iodine.publish(@release_connection_channel, "", Iodine::PubSub::PROCESS) if @__blocked.length > 0 end conn end |
#remove(conn) ⇒ Object
Remove a connection from the connection pool. The connection will remain open and active but will no longer be managed by this pool.
299 300 301 302 303 |
# File 'lib/rage/ext/active_record/connection_pool.rb', line 299 def remove(conn) __remove__(conn) @__in_use.delete_if { |_, c| c == conn } @__connections.delete(conn) end |
#stat ⇒ Object
Return connection pool's usage statistic.
221 222 223 224 225 226 227 228 229 230 231 |
# File 'lib/rage/ext/active_record/connection_pool.rb', line 221 def stat { size: size, connections: size, busy: @__in_use.count { |fiber, _| fiber.alive? }, dead: @__in_use.count { |fiber, _| !fiber.alive? }, idle: @__connections.length, waiting: @__blocked.length, checkout_timeout: @__checkout_timeout } end |
#with_connection(_ = nil) ⇒ Object
Yields a connection from the connection pool to the block.
200 201 202 203 204 205 206 207 208 |
# File 'lib/rage/ext/active_record/connection_pool.rb', line 200 def with_connection(_ = nil) unless (conn = @__in_use[Fiber.current]) conn = connection fresh_connection = true end yield conn ensure release_connection if fresh_connection end |