Class: Mongo::Pool
Constant Summary collapse
- PING_ATTEMPTS =
6- MAX_PING_TIME =
1_000_000- PRUNE_INTERVAL =
10_000
Instance Attribute Summary collapse
-
#address ⇒ Object
Returns the value of attribute address.
-
#checked_out ⇒ Object
Returns the value of attribute checked_out.
-
#client ⇒ Object
Returns the value of attribute client.
-
#host ⇒ Object
Returns the value of attribute host.
-
#port ⇒ Object
Returns the value of attribute port.
-
#size ⇒ Object
Returns the value of attribute size.
-
#timeout ⇒ Object
Returns the value of attribute timeout.
Instance Method Summary collapse
-
#authenticate_existing ⇒ Object
If a user calls DB#authenticate, and several sockets exist, then we need a way to apply the authentication on each socket.
-
#checkin(socket) ⇒ Object
Return a socket to the pool.
-
#checkout ⇒ Object
Check out an existing socket or create a new socket if the maximum pool size has not been exceeded.
-
#checkout_existing_socket(socket = nil) ⇒ Object
Checks out the first available socket from the pool.
-
#checkout_new_socket ⇒ Object
Adds a new socket to the pool and checks it out.
-
#close(opts = {}) ⇒ Object
Close this pool.
- #closed? ⇒ Boolean
- #host_port ⇒ Object
- #host_string ⇒ Object
-
#initialize(client, host, port, opts = {}) ⇒ Pool
constructor
Create a new pool of connections.
- #inspect ⇒ Object
-
#logout_existing(db) ⇒ Object
Store the logout op for each existing socket to be applied before the next use of each socket.
- #matches_mode(mode) ⇒ Object
- #matches_tag_set(tag_set) ⇒ Object
- #matches_tag_sets(tag_sets) ⇒ Object
- #ping ⇒ Object
-
#ping_time ⇒ Object
Refresh ping time only if we haven’t checked within the last five minutes.
- #prune_thread_socket_hash ⇒ Object
-
#refresh_ping_time ⇒ Object
Return the time it takes on average to do a round-trip against this node.
- #tags ⇒ Object
- #up? ⇒ Boolean
Constructor Details
#initialize(client, host, port, opts = {}) ⇒ Pool
Create a new pool of connections.
33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 |
# File 'lib/mongo/util/pool.rb', line 33 def initialize(client, host, port, opts={}) @client = client @host, @port = host, port # A Mongo::Node object. @node = opts[:node] # The string address @address = "#{@host}:#{@port}" # Pool size and timeout. @size = opts.fetch(:size, 20) @timeout = opts.fetch(:timeout, 30) # Mutex for synchronizing pool access @connection_mutex = Mutex.new # Mutex for synchronizing pings @ping_mutex = Mutex.new # Condition variable for signal and wait @queue = ConditionVariable.new # Operations to perform on a socket @socket_ops = Hash.new { |h, k| h[k] = [] } @sockets = [] @pids = {} @checked_out = [] @ping_time = nil @last_ping = nil @closed = false @threads_to_sockets = {} @checkout_counter = 0 end |
Instance Attribute Details
#address ⇒ Object
Returns the value of attribute address.
24 25 26 |
# File 'lib/mongo/util/pool.rb', line 24 def address @address end |
#checked_out ⇒ Object
Returns the value of attribute checked_out.
24 25 26 |
# File 'lib/mongo/util/pool.rb', line 24 def checked_out @checked_out end |
#client ⇒ Object
Returns the value of attribute client.
24 25 26 |
# File 'lib/mongo/util/pool.rb', line 24 def client @client end |
#host ⇒ Object
Returns the value of attribute host.
24 25 26 |
# File 'lib/mongo/util/pool.rb', line 24 def host @host end |
#port ⇒ Object
Returns the value of attribute port.
24 25 26 |
# File 'lib/mongo/util/pool.rb', line 24 def port @port end |
#size ⇒ Object
Returns the value of attribute size.
24 25 26 |
# File 'lib/mongo/util/pool.rb', line 24 def size @size end |
#timeout ⇒ Object
Returns the value of attribute timeout.
24 25 26 |
# File 'lib/mongo/util/pool.rb', line 24 def timeout @timeout end |
Instance Method Details
#authenticate_existing ⇒ Object
If a user calls DB#authenticate, and several sockets exist, then we need a way to apply the authentication on each socket. So we store the apply_authentication method, and this will be applied right before the next use of each socket.
218 219 220 221 222 223 224 225 226 |
# File 'lib/mongo/util/pool.rb', line 218 def authenticate_existing @connection_mutex.synchronize do @sockets.each do |socket| @socket_ops[socket] << Proc.new do @client.apply_saved_authentication(:socket => socket) end end end end |
#checkin(socket) ⇒ Object
Return a socket to the pool.
178 179 180 181 182 183 184 185 186 187 |
# File 'lib/mongo/util/pool.rb', line 178 def checkin(socket) @connection_mutex.synchronize do if @checked_out.delete(socket) @queue.signal else return false end end true end |
#checkout ⇒ Object
Check out an existing socket or create a new socket if the maximum pool size has not been exceeded. Otherwise, wait for the next available socket.
277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 |
# File 'lib/mongo/util/pool.rb', line 277 def checkout @client.connect if !@client.connected? start_time = Time.now loop do if (Time.now - start_time) > @timeout raise ConnectionTimeoutError, "could not obtain connection within " + "#{@timeout} seconds. The max pool size is currently #{@size}; " + "consider increasing the pool size or timeout." end @connection_mutex.synchronize do if @checkout_counter > PRUNE_INTERVAL @checkout_counter = 0 prune_thread_socket_hash else @checkout_counter += 1 end if socket_for_thread = @threads_to_sockets[Thread.current] if !@checked_out.include?(socket_for_thread) socket = checkout_existing_socket(socket_for_thread) end else # First checkout for this thread thread_length = @threads_to_sockets.keys.length if (thread_length <= @sockets.size) && (@sockets.size < @size) socket = checkout_new_socket elsif @checked_out.size < @sockets.size socket = checkout_existing_socket elsif @sockets.size < @size socket = checkout_new_socket end end if socket # This calls all procs, in order, scoped to existing sockets. # At the moment, we use this to lazily authenticate and # logout existing socket connections. @socket_ops[socket].reject! do |op| op.call end if socket.closed? @checked_out.delete(socket) @sockets.delete(socket) @threads_to_sockets.each do |k,v| if v == socket @threads_to_sockets.delete(k) end end socket = checkout_new_socket end return socket else # Otherwise, wait @queue.wait(@connection_mutex) end end end end |
#checkout_existing_socket(socket = nil) ⇒ Object
Checks out the first available socket from the pool.
If the pid has changed, remove the socket and check out new one.
This method is called exclusively from #checkout; therefore, it runs within a mutex.
247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 |
# File 'lib/mongo/util/pool.rb', line 247 def checkout_existing_socket(socket=nil) if !socket socket = (@sockets - @checked_out).first end if @pids[socket] != Process.pid @pids[socket] = nil @sockets.delete(socket) if socket socket.close unless socket.closed? end checkout_new_socket else @checked_out << socket @threads_to_sockets[Thread.current] = socket socket end end |
#checkout_new_socket ⇒ Object
Adds a new socket to the pool and checks it out.
This method is called exclusively from #checkout; therefore, it runs within a mutex.
193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 |
# File 'lib/mongo/util/pool.rb', line 193 def checkout_new_socket begin socket = @client.socket_class.new(@host, @port, @client.op_timeout) socket.pool = self rescue => ex socket.close if socket raise ConnectionFailure, "Failed to connect to host #{@host} and port #{@port}: #{ex}" @node.close if @node end # If any saved authentications exist, we want to apply those # when creating new sockets. @client.apply_saved_authentication(:socket => socket) @sockets << socket @pids[socket] = Process.pid @checked_out << socket @threads_to_sockets[Thread.current] = socket socket end |
#close(opts = {}) ⇒ Object
Close this pool.
74 75 76 77 78 79 80 81 82 83 84 85 86 |
# File 'lib/mongo/util/pool.rb', line 74 def close(opts={}) @connection_mutex.synchronize do if opts[:soft] && !@checked_out.empty? @closing = true close_sockets(@sockets - @checked_out) else close_sockets(@sockets) @closed = true end @node.close if @node end true end |
#closed? ⇒ Boolean
92 93 94 |
# File 'lib/mongo/util/pool.rb', line 92 def closed? @closed end |
#host_port ⇒ Object
129 130 131 |
# File 'lib/mongo/util/pool.rb', line 129 def host_port [@host, @port] end |
#host_string ⇒ Object
125 126 127 |
# File 'lib/mongo/util/pool.rb', line 125 def host_string "#{@host}:#{@port}" end |
#inspect ⇒ Object
119 120 121 122 123 |
# File 'lib/mongo/util/pool.rb', line 119 def inspect "#<Mongo::Pool:0x#{self.object_id.to_s(16)} @host=#{@host} @port=#{port} " + "@ping_time=#{@ping_time} #{@checked_out.size}/#{@size} sockets available " + "up=#{!closed?}>" end |
#logout_existing(db) ⇒ Object
Store the logout op for each existing socket to be applied before the next use of each socket.
230 231 232 233 234 235 236 237 238 |
# File 'lib/mongo/util/pool.rb', line 230 def logout_existing(db) @connection_mutex.synchronize do @sockets.each do |socket| @socket_ops[socket] << Proc.new do @client.db(db).issue_logout(:socket => socket) end end end end |
#matches_mode(mode) ⇒ Object
100 101 102 103 104 105 106 107 |
# File 'lib/mongo/util/pool.rb', line 100 def matches_mode(mode) if mode == :primary && @node.secondary? || mode == :secondary && @node.primary? false else true end end |
#matches_tag_set(tag_set) ⇒ Object
109 110 111 112 113 |
# File 'lib/mongo/util/pool.rb', line 109 def matches_tag_set(tag_set) tag_set.all? do |tag, value| .has_key?(tag) && [tag] == value end end |
#matches_tag_sets(tag_sets) ⇒ Object
115 116 117 |
# File 'lib/mongo/util/pool.rb', line 115 def matches_tag_sets(tag_sets) tag_sets.all? {|set| matches_tag_set(set)} end |
#ping ⇒ Object
169 170 171 172 173 174 175 |
# File 'lib/mongo/util/pool.rb', line 169 def ping begin return self.client['admin'].command({:ping => 1}, :socket => @node.socket, :timeout => MAX_PING_TIME) rescue ConnectionFailure, OperationFailure, SocketError, SystemCallError, IOError return false end end |
#ping_time ⇒ Object
Refresh ping time only if we haven’t checked within the last five minutes.
135 136 137 138 139 140 141 142 143 |
# File 'lib/mongo/util/pool.rb', line 135 def ping_time @ping_mutex.synchronize do if !@last_ping || (Time.now - @last_ping) > 300 @ping_time = refresh_ping_time @last_ping = Time.now end end @ping_time end |
#prune_thread_socket_hash ⇒ Object
266 267 268 269 270 271 272 |
# File 'lib/mongo/util/pool.rb', line 266 def prune_thread_socket_hash current_threads = Set[*Thread.list] @threads_to_sockets.delete_if do |thread, socket| !current_threads.include?(thread) end end |
#refresh_ping_time ⇒ Object
Return the time it takes on average to do a round-trip against this node.
147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 |
# File 'lib/mongo/util/pool.rb', line 147 def refresh_ping_time trials = [] PING_ATTEMPTS.times do t1 = Time.now if !self.ping return MAX_PING_TIME end trials << (Time.now - t1) * 1000 end trials.sort! # Delete shortest and longest times trials.delete_at(trials.length-1) trials.delete_at(0) total = 0.0 trials.each { |t| total += t } (total / trials.length).ceil end |
#tags ⇒ Object
88 89 90 |
# File 'lib/mongo/util/pool.rb', line 88 def @node. end |
#up? ⇒ Boolean
96 97 98 |
# File 'lib/mongo/util/pool.rb', line 96 def up? !@closed end |