Class: Mongo::Pool

Inherits:
Object show all
Defined in:
lib/mongo/util/pool.rb

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(connection, host, port, opts = {}) ⇒ Pool

Create a new pool of connections.



25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
# File 'lib/mongo/util/pool.rb', line 25

def initialize(connection, host, port, opts={})
  @connection  = connection

  @host, @port = host, port

  # Pool size and timeout.
  @size      = opts[:size] || 1
  @timeout   = opts[:timeout]   || 5.0

  # Mutex for synchronizing pool access
  @connection_mutex = Mutex.new

  # Condition variable for signal and wait
  @queue = ConditionVariable.new

  # Operations to perform on a socket
  @socket_ops = Hash.new { |h, k| h[k] = [] }

  @sockets      = []
  @pids         = {}
  @checked_out  = []
end

Instance Attribute Details

#checked_outObject

Returns the value of attribute checked_out.



21
22
23
# File 'lib/mongo/util/pool.rb', line 21

def checked_out
  @checked_out
end

#hostObject

Returns the value of attribute host.



21
22
23
# File 'lib/mongo/util/pool.rb', line 21

def host
  @host
end

#portObject

Returns the value of attribute port.



21
22
23
# File 'lib/mongo/util/pool.rb', line 21

def port
  @port
end

#safeObject

Returns the value of attribute safe.



21
22
23
# File 'lib/mongo/util/pool.rb', line 21

def safe
  @safe
end

#sizeObject

Returns the value of attribute size.



21
22
23
# File 'lib/mongo/util/pool.rb', line 21

def size
  @size
end

#timeoutObject

Returns the value of attribute timeout.



21
22
23
# File 'lib/mongo/util/pool.rb', line 21

def timeout
  @timeout
end

Instance Method Details

#authenticate_existingObject

If a user calls DB#authenticate, and several sockets exist, then we need a way to apply the authentication on each socket. So we store the apply_authentication method, and this will be applied right before the next use of each socket.



97
98
99
100
101
102
103
104
105
# File 'lib/mongo/util/pool.rb', line 97

def authenticate_existing
  @connection_mutex.synchronize do
    @sockets.each do |socket|
      @socket_ops[socket] << Proc.new do
        @connection.apply_saved_authentication(:socket => socket)
      end
    end
  end
end

#checkin(socket) ⇒ Object

Return a socket to the pool.



63
64
65
66
67
68
69
# File 'lib/mongo/util/pool.rb', line 63

def checkin(socket)
  @connection_mutex.synchronize do
    @checked_out.delete(socket)
    @queue.signal
  end
  true
end

#checkoutObject

Check out an existing socket or create a new socket if the maximum pool size has not been exceeded. Otherwise, wait for the next available socket.



142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
# File 'lib/mongo/util/pool.rb', line 142

def checkout
  @connection.connect if !@connection.connected?
  start_time = Time.now
  loop do
    if (Time.now - start_time) > @timeout
        raise ConnectionTimeoutError, "could not obtain connection within " +
          "#{@timeout} seconds. The max pool size is currently #{@size}; " +
          "consider increasing the pool size or timeout."
    end

    @connection_mutex.synchronize do
      socket = if @checked_out.size < @sockets.size
                 checkout_existing_socket
               elsif @sockets.size < @size
                 checkout_new_socket
               end

      if socket

      # This calls all procs, in order, scoped to existing sockets.
      # At the moment, we use this to lazily authenticate and
      # logout existing socket connections.
      @socket_ops[socket].reject! do |op|
        op.call
      end

        return socket
      else
        # Otherwise, wait
        @queue.wait(@connection_mutex)
      end
    end
  end
end

#checkout_existing_socketObject

Checks out the first available socket from the pool.

If the pid has changed, remove the socket and check out new one.

This method is called exclusively from #checkout; therefore, it runs within a mutex.



126
127
128
129
130
131
132
133
134
135
136
137
# File 'lib/mongo/util/pool.rb', line 126

def checkout_existing_socket
  socket = (@sockets - @checked_out).first
  if @pids[socket] != Process.pid
     @pids[socket] = nil
     @sockets.delete(socket)
     socket.close
     checkout_new_socket
  else
    @checked_out << socket
    socket
  end
end

#checkout_new_socketObject

Adds a new socket to the pool and checks it out.

This method is called exclusively from #checkout; therefore, it runs within a mutex.



75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
# File 'lib/mongo/util/pool.rb', line 75

def checkout_new_socket
  begin
  socket = TCPSocket.new(@host, @port)
  socket.setsockopt(Socket::IPPROTO_TCP, Socket::TCP_NODELAY, 1)
  rescue => ex
    raise ConnectionFailure, "Failed to connect to host #{@host} and port #{@port}: #{ex}"
  end

  # If any saved authentications exist, we want to apply those
  # when creating new sockets.
  @connection.apply_saved_authentication(:socket => socket)

  @sockets << socket
  @pids[socket] = Process.pid
  @checked_out << socket
  socket
end

#closeObject



48
49
50
51
52
53
54
55
56
57
58
59
60
# File 'lib/mongo/util/pool.rb', line 48

def close
  @sockets.each do |sock|
    begin
      sock.close
    rescue IOError => ex
      warn "IOError when attempting to close socket connected to #{@host}:#{@port}: #{ex.inspect}"
    end
  end
  @host = @port = nil
  @sockets.clear
  @pids.clear
  @checked_out.clear
end

#logout_existing(db) ⇒ Object

Store the logout op for each existing socket to be applied before the next use of each socket.



109
110
111
112
113
114
115
116
117
# File 'lib/mongo/util/pool.rb', line 109

def logout_existing(db)
  @connection_mutex.synchronize do
    @sockets.each do |socket|
      @socket_ops[socket] << Proc.new do
        @connection.db(db).issue_logout(:socket => socket)
      end
    end
  end
end