Class: Mongo::Pool
Instance Attribute Summary collapse
-
#checked_out ⇒ Object
Returns the value of attribute checked_out.
-
#host ⇒ Object
Returns the value of attribute host.
-
#port ⇒ Object
Returns the value of attribute port.
-
#safe ⇒ Object
Returns the value of attribute safe.
-
#size ⇒ Object
Returns the value of attribute size.
-
#timeout ⇒ Object
Returns the value of attribute timeout.
-
#unix_socket_path ⇒ Object
Returns the value of attribute unix_socket_path.
Instance Method Summary collapse
-
#authenticate_existing ⇒ Object
If a user calls DB#authenticate, and several sockets exist, then we need a way to apply the authentication on each socket.
-
#checkin(socket) ⇒ Object
Return a socket to the pool.
-
#checkout ⇒ Object
Check out an existing socket or create a new socket if the maximum pool size has not been exceeded.
-
#checkout_existing_socket ⇒ Object
Checks out the first available socket from the pool.
-
#checkout_new_socket ⇒ Object
Adds a new socket to the pool and checks it out.
- #close ⇒ Object
-
#initialize(connection, host, port, opts = {}) ⇒ Pool
constructor
Create a new pool of connections.
-
#logout_existing(db) ⇒ Object
Store the logout op for each existing socket to be applied before the next use of each socket.
- #to_s ⇒ Object
Constructor Details
#initialize(connection, host, port, opts = {}) ⇒ Pool
Create a new pool of connections.
25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 |
# File 'lib/mongo/util/pool.rb', line 25 def initialize(connection, host, port, opts={}) @connection = connection @host, @port = host, port unless @port @unix_socket_path = host @host = nil end # Pool size and timeout. @size = opts[:size] || 1 @timeout = opts[:timeout] || 5.0 # Mutex for synchronizing pool access @connection_mutex = Mutex.new # Condition variable for signal and wait @queue = ConditionVariable.new # Operations to perform on a socket @socket_ops = Hash.new { |h, k| h[k] = [] } @sockets = [] @checked_out = [] end |
Instance Attribute Details
#checked_out ⇒ Object
Returns the value of attribute checked_out.
21 22 23 |
# File 'lib/mongo/util/pool.rb', line 21 def checked_out @checked_out end |
#host ⇒ Object
Returns the value of attribute host.
21 22 23 |
# File 'lib/mongo/util/pool.rb', line 21 def host @host end |
#port ⇒ Object
Returns the value of attribute port.
21 22 23 |
# File 'lib/mongo/util/pool.rb', line 21 def port @port end |
#safe ⇒ Object
Returns the value of attribute safe.
21 22 23 |
# File 'lib/mongo/util/pool.rb', line 21 def safe @safe end |
#size ⇒ Object
Returns the value of attribute size.
21 22 23 |
# File 'lib/mongo/util/pool.rb', line 21 def size @size end |
#timeout ⇒ Object
Returns the value of attribute timeout.
21 22 23 |
# File 'lib/mongo/util/pool.rb', line 21 def timeout @timeout end |
#unix_socket_path ⇒ Object
Returns the value of attribute unix_socket_path.
21 22 23 |
# File 'lib/mongo/util/pool.rb', line 21 def unix_socket_path @unix_socket_path end |
Instance Method Details
#authenticate_existing ⇒ Object
If a user calls DB#authenticate, and several sockets exist, then we need a way to apply the authentication on each socket. So we store the apply_authentication method, and this will be applied right before the next use of each socket.
103 104 105 106 107 108 109 110 111 |
# File 'lib/mongo/util/pool.rb', line 103 def authenticate_existing @connection_mutex.synchronize do @sockets.each do |socket| @socket_ops[socket] << Proc.new do @connection.apply_saved_authentication(:socket => socket) end end end end |
#checkin(socket) ⇒ Object
Return a socket to the pool.
66 67 68 69 70 71 72 |
# File 'lib/mongo/util/pool.rb', line 66 def checkin(socket) @connection_mutex.synchronize do @checked_out.delete(socket) @queue.signal end true end |
#checkout ⇒ Object
Check out an existing socket or create a new socket if the maximum pool size has not been exceeded. Otherwise, wait for the next available socket.
138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 |
# File 'lib/mongo/util/pool.rb', line 138 def checkout @connection.connect if !@connection.connected? start_time = Time.now loop do if (Time.now - start_time) > @timeout raise ConnectionTimeoutError, "could not obtain connection within " + "#{@timeout} seconds. The max pool size is currently #{@size}; " + "consider increasing the pool size or timeout." end @connection_mutex.synchronize do socket = if @checked_out.size < @sockets.size checkout_existing_socket elsif @sockets.size < @size checkout_new_socket end if socket # This calls all procs, in order, scoped to existing sockets. # At the moment, we use this to lazily authenticate and # logout existing socket connections. @socket_ops[socket].reject! do |op| op.call end return socket else # Otherwise, wait if @logger @logger.warn "MONGODB Waiting for available connection; " + "#{@checked_out.size} of #{@size} connections checked out." end @queue.wait(@connection_mutex) end end end end |
#checkout_existing_socket ⇒ Object
Checks out the first available socket from the pool.
This method is called exclusively from #checkout; therefore, it runs within a mutex.
129 130 131 132 133 |
# File 'lib/mongo/util/pool.rb', line 129 def checkout_existing_socket socket = (@sockets - @checked_out).first @checked_out << socket socket end |
#checkout_new_socket ⇒ Object
Adds a new socket to the pool and checks it out.
This method is called exclusively from #checkout; therefore, it runs within a mutex.
78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 |
# File 'lib/mongo/util/pool.rb', line 78 def checkout_new_socket begin if @unix_socket_path socket = UNIXSocket.new(@unix_socket_path) else socket = TCPSocket.new(@host, @port) socket.setsockopt(Socket::IPPROTO_TCP, Socket::TCP_NODELAY, 1) end rescue => ex raise ConnectionFailure, "Failed to connect to #{self.to_s}: #{ex}" end # If any saved authentications exist, we want to apply those # when creating new sockets. @connection.apply_saved_authentication(:socket => socket) @sockets << socket @checked_out << socket socket end |
#close ⇒ Object
52 53 54 55 56 57 58 59 60 61 62 63 |
# File 'lib/mongo/util/pool.rb', line 52 def close @sockets.each do |sock| begin sock.close rescue IOError => ex warn "IOError when attempting to close socket connected to #{self.to_s}: #{ex.inspect}" end end @host = @port = @unix_socket_path = nil @sockets.clear @checked_out.clear end |
#logout_existing(db) ⇒ Object
Store the logout op for each existing socket to be applied before the next use of each socket.
115 116 117 118 119 120 121 122 123 |
# File 'lib/mongo/util/pool.rb', line 115 def logout_existing(db) @connection_mutex.synchronize do @sockets.each do |socket| @socket_ops[socket] << Proc.new do @connection.db(db).issue_logout(:socket => socket) end end end end |
#to_s ⇒ Object
177 178 179 180 181 182 183 |
# File 'lib/mongo/util/pool.rb', line 177 def to_s if @unix_socket_path "#{@unix_socket_path}" else "#{@host}:#{@port}" end end |