Class: Webhookdb::ConnectionCache
- Inherits:
-
Object
- Object
- Webhookdb::ConnectionCache
- Extended by:
- MethodUtilities
- Includes:
- Appydays::Configurable, Appydays::Loggable, Dbutil
- Defined in:
- lib/webhookdb/connection_cache.rb
Overview
Keep a dynamic cache of open database connections. Very similar in behavior to Sequel::DATABASES, but we need to prune connections that have not been inactive for a while.
When ‘borrow` is called, either a new connection is made, or an existing one used, for that URL. The connection is yield to the block.
Then, after the block is called, if ‘prune_interval’ has elapsed since the last prune, prune all connections with 0 current connections, _other than the connection just used_. Because this connection was just used, we assume it will be used again soon.
The idea here is that:
-
We cannot connect to the DB statically; each org can have its own DB, so storing it statically would increase DB connections to the the number of orgs in the database.
-
So we replace the organization/synchronization done in Sequel::DATABASES with ConnectionCache.
-
Any number of worker threads need access to the same DB; rather than connecting inline, which is very slow, all DB connections for an org (or across orgs if not in database isolation) can share connections via ConnectionCache.
-
In single-org/db environments, the active organization will always always be the same, so the connection is never returned.
-
In multi-org/db environments, busy orgs will likely stay busy. But a reconnect isn’t the end of the world.
-
It seems more efficient to be pessimistic about future use, and prune anything with 0 connections, rather than optimistic, and use an LRU or something similar, since the connections are somewhat expensive resources to keep open for now reason. That said, we could switch this out for an LRU it the pessimistic pruning results in many reconnections. It would also be reasonable to increase the prune interval to avoid disconnecting as frequently.
Note that, due to certain implementation details, such as setting timeouts and automatic transaction handling, we implement our own threaded connection pooling, so use the SingleThreadedConnectionPool in Sequel and manage multiple threads on our own.
Defined Under Namespace
Classes: Available, ReentranceError
Constant Summary
Constants included from Dbutil
Instance Attribute Summary collapse
-
#dbs_for_urls ⇒ Object
Returns the value of attribute dbs_for_urls.
-
#last_pruned_at ⇒ Object
Returns the value of attribute last_pruned_at.
-
#prune_interval ⇒ Object
Returns the value of attribute prune_interval.
Class Method Summary collapse
Instance Method Summary collapse
-
#borrow(url, transaction: false, timeout: nil, &block) ⇒ Object
Connect to the database at the given URL.
-
#disconnect(url) ⇒ Object
Disconnect the cached connection for the given url, if any.
- #force_disconnect_all ⇒ Object
-
#initialize(prune_interval:) ⇒ ConnectionCache
constructor
A new instance of ConnectionCache.
- #next_prune_at ⇒ Object
- #summarize ⇒ Object
Methods included from MethodUtilities
attr_predicate, attr_predicate_accessor, singleton_attr_accessor, singleton_attr_reader, singleton_attr_writer, singleton_method_alias, singleton_predicate_accessor, singleton_predicate_reader
Methods included from Dbutil
borrow_conn, configured_connection_options, conn_opts, displaysafe_url, reduce_expr, take_conn
Constructor Details
#initialize(prune_interval:) ⇒ ConnectionCache
Returns a new instance of ConnectionCache.
82 83 84 85 86 87 |
# File 'lib/webhookdb/connection_cache.rb', line 82 def initialize(prune_interval:) @mutex = Mutex.new @dbs_for_urls = {} @prune_interval = prune_interval @last_pruned_at = Time.now end |
Instance Attribute Details
#dbs_for_urls ⇒ Object
Returns the value of attribute dbs_for_urls.
80 81 82 |
# File 'lib/webhookdb/connection_cache.rb', line 80 def dbs_for_urls @dbs_for_urls end |
#last_pruned_at ⇒ Object
Returns the value of attribute last_pruned_at.
80 81 82 |
# File 'lib/webhookdb/connection_cache.rb', line 80 def last_pruned_at @last_pruned_at end |
#prune_interval ⇒ Object
Returns the value of attribute prune_interval.
80 81 82 |
# File 'lib/webhookdb/connection_cache.rb', line 80 def prune_interval @prune_interval end |
Class Method Details
.borrow(url, **kw) ⇒ Object
68 69 70 |
# File 'lib/webhookdb/connection_cache.rb', line 68 def self.borrow(url, **kw, &) return self._instance.borrow(url, **kw, &) end |
.disconnect(url) ⇒ Object
72 73 74 |
# File 'lib/webhookdb/connection_cache.rb', line 72 def self.disconnect(url) self._instance.disconnect(url) end |
.force_disconnect_all ⇒ Object
76 77 78 |
# File 'lib/webhookdb/connection_cache.rb', line 76 def self.force_disconnect_all self._instance.force_disconnect_all end |
Instance Method Details
#borrow(url, transaction: false, timeout: nil, &block) ⇒ Object
Connect to the database at the given URL. borrow is not re-entrant, so if the current thread already owns a connection to the given url, raise a ReentrantError. If the url has a DB not in use by any thread, yield that. If the url has no DBs opened, or all are checked out, create and yield a new connection. See class docs for more details.
116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 |
# File 'lib/webhookdb/connection_cache.rb', line 116 def borrow(url, transaction: false, timeout: nil, &block) raise LocalJumpError if block.nil? raise ArgumentError, "url cannot be blank" if url.blank? now = Time.now if timeout.is_a?(Symbol) timeout_name = "timeout_#{timeout}" begin timeout = Webhookdb::ConnectionCache.send(timeout_name) rescue NoMethodError raise NoMethodError, "no timeout accessor :#{timeout_name}" end end t = Thread.current conn = nil @mutex.synchronize do db_loans = @dbs_for_urls[url] ||= {loaned: {}, available: []} if db_loans[:loaned].key?(t) raise ReentranceError, "ConnectionCache#borrow is not re-entrant for the same database since the connection has stateful config" end if (available = db_loans[:available].pop) # If the connection doesn't validate, it won't be in :available at this point, so don't worry about it. conn = available.validated_connection end conn ||= take_conn(url, single_threaded: true, extensions: [:pg_json, :pg_streaming]) db_loans[:loaned][t] = conn end conn << "SET statement_timeout TO #{timeout * 1000}" if timeout.present? conn << "BEGIN;" if transaction begin result = yield conn conn << "COMMIT;" if transaction rescue Sequel::DatabaseError conn << "ROLLBACK;" if transaction raise ensure conn << "SET statement_timeout TO 0" if timeout.present? @mutex.synchronize do @dbs_for_urls[url][:loaned].delete(t) @dbs_for_urls[url][:available] << Available.new(conn, Time.now) end end self.prune(url) if now > self.next_prune_at return result end |
#disconnect(url) ⇒ Object
Disconnect the cached connection for the given url, if any. In general, this is only needed when tearing down a database.
166 167 168 169 170 171 172 173 174 175 176 |
# File 'lib/webhookdb/connection_cache.rb', line 166 def disconnect(url) raise ArgumentError, "url cannot be blank" if url.blank? db_loans = @dbs_for_urls[url] return if db_loans.nil? if db_loans[:loaned].size.positive? raise Webhookdb::InvalidPrecondition, "url #{displaysafe_url(url)} still has #{db_loans[:loaned].size} active connections" end db_loans[:available].each(&:disconnect) @dbs_for_urls.delete(url) end |
#force_disconnect_all ⇒ Object
186 187 188 189 190 191 192 |
# File 'lib/webhookdb/connection_cache.rb', line 186 def force_disconnect_all @dbs_for_urls.each_value do |db_loans| db_loans[:available].each(&:disconnect) db_loans[:loaned].each_value(&:disconnect) end @dbs_for_urls.clear end |
#next_prune_at ⇒ Object
162 |
# File 'lib/webhookdb/connection_cache.rb', line 162 def next_prune_at = self.last_pruned_at + self.prune_interval |
#summarize ⇒ Object
194 195 196 197 198 |
# File 'lib/webhookdb/connection_cache.rb', line 194 def summarize return self.dbs_for_urls.transform_values do |loans| {loaned: loans[:loaned].size, available: loans[:available].size} end end |