Class: Mongo::Pool

Inherits:
Object show all
Defined in:
lib/mongo/util/pool.rb

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(connection, host, port, opts = {}) ⇒ Pool

Create a new pool of connections.



25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
# File 'lib/mongo/util/pool.rb', line 25

def initialize(connection, host, port, opts={})
  @connection  = connection

  @host, @port = host, port

  unless @port
    @unix_socket_path = host
    @host = nil
  end

  # Pool size and timeout.
  @size      = opts[:size] || 1
  @timeout   = opts[:timeout]   || 5.0

  # Mutex for synchronizing pool access
  @connection_mutex = Mutex.new

  # Condition variable for signal and wait
  @queue = ConditionVariable.new

  # Operations to perform on a socket
  @socket_ops = Hash.new { |h, k| h[k] = [] }

  @sockets      = []
  @checked_out  = []
end

Instance Attribute Details

#checked_outObject

Returns the value of attribute checked_out.



21
22
23
# File 'lib/mongo/util/pool.rb', line 21

def checked_out
  @checked_out
end

#hostObject

Returns the value of attribute host.



21
22
23
# File 'lib/mongo/util/pool.rb', line 21

def host
  @host
end

#portObject

Returns the value of attribute port.



21
22
23
# File 'lib/mongo/util/pool.rb', line 21

def port
  @port
end

#safeObject

Returns the value of attribute safe.



21
22
23
# File 'lib/mongo/util/pool.rb', line 21

def safe
  @safe
end

#sizeObject

Returns the value of attribute size.



21
22
23
# File 'lib/mongo/util/pool.rb', line 21

def size
  @size
end

#timeoutObject

Returns the value of attribute timeout.



21
22
23
# File 'lib/mongo/util/pool.rb', line 21

def timeout
  @timeout
end

#unix_socket_pathObject

Returns the value of attribute unix_socket_path.



21
22
23
# File 'lib/mongo/util/pool.rb', line 21

def unix_socket_path
  @unix_socket_path
end

Instance Method Details

#authenticate_existingObject

If a user calls DB#authenticate, and several sockets exist, then we need a way to apply the authentication on each socket. So we store the apply_authentication method, and this will be applied right before the next use of each socket.



103
104
105
106
107
108
109
110
111
# File 'lib/mongo/util/pool.rb', line 103

def authenticate_existing
  @connection_mutex.synchronize do
    @sockets.each do |socket|
      @socket_ops[socket] << Proc.new do
        @connection.apply_saved_authentication(:socket => socket)
      end
    end
  end
end

#checkin(socket) ⇒ Object

Return a socket to the pool.



66
67
68
69
70
71
72
# File 'lib/mongo/util/pool.rb', line 66

def checkin(socket)
  @connection_mutex.synchronize do
    @checked_out.delete(socket)
    @queue.signal
  end
  true
end

#checkoutObject

Check out an existing socket or create a new socket if the maximum pool size has not been exceeded. Otherwise, wait for the next available socket.



138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
# File 'lib/mongo/util/pool.rb', line 138

def checkout
  @connection.connect if !@connection.connected?
  start_time = Time.now
  loop do
    if (Time.now - start_time) > @timeout
        raise ConnectionTimeoutError, "could not obtain connection within " +
          "#{@timeout} seconds. The max pool size is currently #{@size}; " +
          "consider increasing the pool size or timeout."
    end

    @connection_mutex.synchronize do
      socket = if @checked_out.size < @sockets.size
                 checkout_existing_socket
               elsif @sockets.size < @size
                 checkout_new_socket
               end

      if socket

      # This calls all procs, in order, scoped to existing sockets.
      # At the moment, we use this to lazily authenticate and
      # logout existing socket connections.
      @socket_ops[socket].reject! do |op|
        op.call
      end

        return socket
      else
        # Otherwise, wait
        if @logger
          @logger.warn "MONGODB Waiting for available connection; " +
            "#{@checked_out.size} of #{@size} connections checked out."
        end
        @queue.wait(@connection_mutex)
      end
    end
  end
end

#checkout_existing_socketObject

Checks out the first available socket from the pool.

This method is called exclusively from #checkout; therefore, it runs within a mutex.



129
130
131
132
133
# File 'lib/mongo/util/pool.rb', line 129

def checkout_existing_socket
  socket = (@sockets - @checked_out).first
  @checked_out << socket
  socket
end

#checkout_new_socketObject

Adds a new socket to the pool and checks it out.

This method is called exclusively from #checkout; therefore, it runs within a mutex.



78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
# File 'lib/mongo/util/pool.rb', line 78

def checkout_new_socket
  begin
    if @unix_socket_path
      socket = UNIXSocket.new(@unix_socket_path)
    else
      socket = TCPSocket.new(@host, @port)
      socket.setsockopt(Socket::IPPROTO_TCP, Socket::TCP_NODELAY, 1)
    end
  rescue => ex
    raise ConnectionFailure, "Failed to connect to #{self.to_s}: #{ex}"
  end

  # If any saved authentications exist, we want to apply those
  # when creating new sockets.
  @connection.apply_saved_authentication(:socket => socket)

  @sockets << socket
  @checked_out << socket
  socket
end

#closeObject



52
53
54
55
56
57
58
59
60
61
62
63
# File 'lib/mongo/util/pool.rb', line 52

def close
  @sockets.each do |sock|
    begin
      sock.close
    rescue IOError => ex
      warn "IOError when attempting to close socket connected to #{self.to_s}: #{ex.inspect}"
    end
  end
  @host = @port = @unix_socket_path = nil
  @sockets.clear
  @checked_out.clear
end

#logout_existing(db) ⇒ Object

Store the logout op for each existing socket to be applied before the next use of each socket.



115
116
117
118
119
120
121
122
123
# File 'lib/mongo/util/pool.rb', line 115

def logout_existing(db)
  @connection_mutex.synchronize do
    @sockets.each do |socket|
      @socket_ops[socket] << Proc.new do
        @connection.db(db).issue_logout(:socket => socket)
      end
    end
  end
end

#to_sObject



177
178
179
180
181
182
183
# File 'lib/mongo/util/pool.rb', line 177

def to_s
  if @unix_socket_path
    "#{@unix_socket_path}"
  else
    "#{@host}:#{@port}"
  end
end