Class: SocketPool

Inherits:
Object
  • Object
show all
Defined in:
lib/socketpool.rb

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(host, port, opts = {}) ⇒ SocketPool

Create a new socket pool



23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
# File 'lib/socketpool.rb', line 23

def initialize(host, port, opts={})
  @host, @port = host, port

  # Pool size and timeout.
  @size      = opts[:size] || 2
  @timeout   = opts[:timeout] || 5
  @eager      = opts[:eager] || false

  # Mutex for synchronizing pool access
  @connection_mutex = Mutex.new

  # Condition variable for signal and wait
  @queue = ConditionVariable.new
  
  @socktype     = opts[:type] || :tcp
  @sockopts     = opts[:socketopts].nil? ? [] : [opts[:socketopts]].flatten.inject([]){|s, so| s << so}
  @sockets      = []
  @pids         = {}
  @checked_out  = []
  
  initialize_socketpool if @eager
end

Instance Attribute Details

#checked_outObject

Returns the value of attribute checked_out.



19
20
21
# File 'lib/socketpool.rb', line 19

def checked_out
  @checked_out
end

#hostObject

Returns the value of attribute host.



19
20
21
# File 'lib/socketpool.rb', line 19

def host
  @host
end

#portObject

Returns the value of attribute port.



19
20
21
# File 'lib/socketpool.rb', line 19

def port
  @port
end

#sizeObject

Returns the value of attribute size.



19
20
21
# File 'lib/socketpool.rb', line 19

def size
  @size
end

#timeoutObject

Returns the value of attribute timeout.



19
20
21
# File 'lib/socketpool.rb', line 19

def timeout
  @timeout
end

Instance Method Details

#checkin(sock, reset = false) ⇒ Object

Return a socket to the pool. Allow for destroying a resetting socket if the application determines the connection is no good



62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
# File 'lib/socketpool.rb', line 62

def checkin(sock, reset=false)
  @connection_mutex.synchronize do
    if reset
      @sockets.delete(sock)
      @checked_out.delete(sock)
      @pids.delete(sock)
      sock.close
      sock = checkout_new_socket
    end

    @checked_out.delete(sock)          
    @queue.signal
  end
  true
end

#checkoutObject

Check out an existing socket or create a new socket if the maximum pool size has not been exceeded. Otherwise, wait for the next available socket.



117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
# File 'lib/socketpool.rb', line 117

def checkout
  start_time = Time.now
  loop do
    if (Time.now - start_time) > @timeout
        raise ConnectionTimeoutError, "could not obtain connection within " +
          "#{@timeout} seconds. The max pool size is currently #{@size}; " +
          "consider increasing the pool size or timeout."
    end

    @connection_mutex.synchronize do
      socket = if @checked_out.size < @sockets.size
                 checkout_existing_socket
               elsif @sockets.size < @size
                 checkout_new_socket
               end

      if socket
        return socket
      else
        # Otherwise, wait
        @queue.wait(@connection_mutex)
      end
    end
  end
end

#checkout_existing_socketObject

Checks out the first available socket from the pool.

If the pid has changed, remove the socket and check out new one.

This method is called exclusively from #checkout; therefore, it runs within a mutex.



101
102
103
104
105
106
107
108
109
110
111
112
# File 'lib/socketpool.rb', line 101

def checkout_existing_socket
  socket = (@sockets - @checked_out).first
  if @pids[socket] != Process.pid
     @pids[socket] = nil
     @sockets.delete(socket)
     socket.close
     checkout_new_socket
  else
    @checked_out << socket
    socket
  end
end

#checkout_new_socketObject

Adds a new socket to the pool and checks it out.



79
80
81
82
83
84
85
86
87
88
89
90
91
92
# File 'lib/socketpool.rb', line 79

def checkout_new_socket
  socket = Socket.new(so_domain(@socktype), so_type(@socktype), 0)

  # Pack address for sockets and set any options passed
  @sockaddr ||= Socket.pack_sockaddr_in(@port, @host) if ![:unix, :unigram].include?(@socktype)
  @sockaddr ||= Socket.pack_sockaddr_un(@host) if [:unix, :unigram].include?(@socktype)
  @sockopts.each{ |opt| socket.setsockopt(opt[:level], opt[:optname], opt[:optval]) } if @sockopts.size > 0 
  socket.connect(@sockaddr)

  @checked_out << socket
  @sockets << socket
  @pids[socket] = Process.pid
  socket
end

#closeObject



46
47
48
49
50
51
52
53
54
55
56
57
58
# File 'lib/socketpool.rb', line 46

def close
  @sockets.each do |sock|
    begin
      sock.close
    rescue IOError => ex
      warn "IOError when attempting to close socket connected to #{@host}:#{@port}: #{ex.inspect}"
    end
  end
  @host = @port = nil
  @sockets.clear
  @pids.clear
  @checked_out.clear
end

#so_domain(val) ⇒ Object



155
156
157
158
159
160
161
162
163
164
165
# File 'lib/socketpool.rb', line 155

def so_domain(val)
  val = val.downcase.to_sym if val.respond_to?(:downcase) && val.respond_to?(:to_sym)
  @so_domain ||= {
    :tcp => Socket::AF_INET,
    :tcp6 => Socket::AF_INET6,
    :udp => Socket::AF_INET,
    :udp6 => Socket::AF_INET6,
    :unix => Socket::AF_UNIX,
    :unigram => Socket::AF_UNIX
  }[val]
end

#so_type(val) ⇒ Object



143
144
145
146
147
148
149
150
151
152
153
# File 'lib/socketpool.rb', line 143

def so_type(val)
  val = val.downcase.to_sym if val.respond_to?(:downcase) && val.respond_to?(:to_sym)
  @so_type ||= {
    :tcp => Socket::SOCK_STREAM,
    :tcp6 => Socket::SOCK_STREAM,
    :udp => Socket::SOCK_DGRAM,
    :udp6 => Socket::SOCK_DGRAM,
    :unix => Socket::SOCK_STREAM,
    :unigram => Socket::SOCK_DGRAM
  }[val]
end