Class: GenePool

Inherits:
Object
  • Object
show all
Defined in:
lib/gene_pool.rb

Overview

Generic connection pool class

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(options = {}, &connect_block) ⇒ GenePool

Creates a gene_pool. The passed block will be used to initialize a single instance of the item being pooled (i.e., socket connection or whatever) options -

name         - The name used in logging messages.
pool_size    - The maximum number of instances that will be created (Defaults to 1).
timeout      - Will raise a Timeout exception if waiting on a connection for this many seconds.
warn_timeout - Displays an error message if a checkout takes longer that the given time (used to give hints to increase the pool size).
idle_timeout - If set, the connection will be renewed if it hasn't been used in this amount of time (seconds).
logger       - The logger used for log messages, defaults to STDERR.
close_proc   - The process or method used to close a pooled instance when it is removed.
  Defaults to :close.  Set to nil for no-op or a symbol for a method or a proc that takes an argument for the instance.


20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
# File 'lib/gene_pool.rb', line 20

def initialize(options={}, &connect_block)
  @connect_block = connect_block

  @name         = options[:name]         || 'GenePool'
  @pool_size    = options[:pool_size]    || 1
  @timeout      = options[:timeout]
  @warn_timeout = options[:warn_timeout] || 5.0
  @idle_timeout = options[:idle_timeout]
  @logger       = options[:logger]
  @close_proc   = options[:close_proc]   || (!options.has_key?(:close_proc) && :close)

  unless @logger
    @logger = Logger.new(STDERR)
    @logger.level = Logger::INFO
  end

  @connections = []
  @checked_out = []
  # Map the original connections object_id within the with_connection method to the final connection.
  # This could change if the connection is renew'ed.
  @with_map    = {}

  setup_mutex
end

Instance Attribute Details

#loggerObject

Returns the value of attribute logger.



7
8
9
# File 'lib/gene_pool.rb', line 7

def logger
  @logger
end

#nameObject

Returns the value of attribute name.



7
8
9
# File 'lib/gene_pool.rb', line 7

def name
  @name
end

#pool_sizeObject

Returns the value of attribute pool_size.



7
8
9
# File 'lib/gene_pool.rb', line 7

def pool_size
  @pool_size
end

#warn_timeoutObject

Returns the value of attribute warn_timeout.



7
8
9
# File 'lib/gene_pool.rb', line 7

def warn_timeout
  @warn_timeout
end

Instance Method Details

#checkin(connection) ⇒ Object

Return a connection to the pool.



107
108
109
110
111
112
113
114
115
116
117
118
119
# File 'lib/gene_pool.rb', line 107

def checkin(connection)
  @mutex.synchronize do
    @checked_out.delete(connection)
    if @pool_size < @connections.size
      remove_and_close(connection)
      @logger.info "#{@name}: Checkin connection #{connection}(#{connection.object_id}) has been removed due to pool size reduction"
    else
      connection._last_used = Time.now
      @condition.signal
    end
  end
  @logger.debug {"#{@name}: Checkin connection #{connection}(#{connection.object_id}) self=#{self}"}
end

#checkoutObject

Check out a connection from the pool, creating it if necessary.



66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
# File 'lib/gene_pool.rb', line 66

def checkout
  start_time = Time.now
  connection = nil
  reserved_connection_placeholder = Thread.current
  begin
    @mutex.synchronize do
      raise "Can't perform checkout, #{@name} has been closed" if @pool_size == 0
      until connection do
        if @checked_out.size < @connections.size
          connection = (@connections - @checked_out).first
          @checked_out << connection
        elsif @connections.size < @pool_size
          # Perform the actual connection outside the mutex
          connection = reserved_connection_placeholder
          @connections << connection
          @checked_out << connection
          @logger.debug {"#{@name}: Created connection ##{@connections.size} #{connection}(#{connection.object_id}) for #{name}"}
        else
          @logger.info "#{@name}: Waiting for an available connection, all #{@pool_size} connections are checked out."
          wait_mutex(start_time)
        end
      end
    end
  ensure
    delta = Time.now - start_time
    if delta > @warn_timeout
      @logger.warn "#{@name}: It took #{delta} seconds to obtain a connection.  Consider raising the pool size which is " +
        "currently set to #{@pool_size}."
    end
  end
  if connection == reserved_connection_placeholder
    connection = renew(reserved_connection_placeholder)
  elsif @idle_timeout && (Time.now - connection._last_used) >= @idle_timeout
    connection = renew(connection)
  end

  @logger.debug {"#{@name}: Checkout connection #{connection}(#{connection.object_id}) self=#{self}"}
  return connection
end

#close(timeout = 10) ⇒ Object



230
231
232
233
234
235
236
237
238
239
240
241
# File 'lib/gene_pool.rb', line 230

def close(timeout=10)
  self.pool_size = 0
  start_time = Time.now
  while (Time.now - start_time) < timeout
    sleep 1
    @mutex.synchronize do
      return if @connections.empty?
      @logger.info "#{@name}: Waiting to close, #{@connections.size} connections are still in use"
    end
  end
  @logger.warn "#{@name}: Timed out while waiting to close, #{@connections.size} connections are still in use"
end

#eachObject

Perform the given block for each connection. Note that close should be used for safely closing all connections This should probably only ever be used to allow interrupt of a connection that is checked out?



224
225
226
227
228
# File 'lib/gene_pool.rb', line 224

def each
  @mutex.synchronize do
    @connections.each { |connection| yield connection }
  end
end

#remove(connection) ⇒ Object

Remove an existing connection from the pool



179
180
181
182
183
184
185
186
187
# File 'lib/gene_pool.rb', line 179

def remove(connection)
  @mutex.synchronize do
    @connections.delete(connection)
    @checked_out.delete(connection)
    @condition.signal
  end
  close_connection(connection)
  @logger.debug {"#{@name}: Removed connection #{connection}(#{connection.object_id}) self=#{self}"}
end

#remove_idle(idle_time = 60) ⇒ Object



243
244
245
246
247
248
249
250
251
252
# File 'lib/gene_pool.rb', line 243

def remove_idle(idle_time=60)
  @mutex.synchronize do
    (@connections - @checked_out).each do |idle_connection|
      if (Time.now - idle_connection._last_used) >= idle_time
        remove_and_close(idle_connection)
        @logger.debug {"#{@name}: Removed idle connection=#{idle_connection}(#{idle_connection.object_id})"}
      end
    end
  end
end

#renew(old_connection) ⇒ Object

If a connection needs to be renewed for some reason, reassign it here



190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
# File 'lib/gene_pool.rb', line 190

def renew(old_connection)
  new_connection =
    begin
      @connect_block.call
    rescue Exception
      remove old_connection
      raise
    end
  class << new_connection
    attr_accessor :_last_used
  end
  @mutex.synchronize do
    index = @checked_out.index(old_connection)
    raise Error.new("Can't reassign non-checked out connection for #{@name}") unless index
    close_connection(old_connection)
    @checked_out[index] = new_connection
    @connections[@connections.index(old_connection)] = new_connection

    # If this is part of a with_connection block, then track our new connection
    if @with_map.respond_to?(:key)
      with_key = @with_map.key(old_connection)
    else
      # 1.8 compatibility
      with_key = @with_map.index(old_connection)
    end

    @with_map[with_key] = new_connection if with_key
  end
  @logger.debug {"#{@name}: Renewed connection old=#{old_connection.object_id} new=#{new_connection}(#{new_connection.object_id})"}
  return new_connection
end

#sizeObject



45
46
47
48
49
# File 'lib/gene_pool.rb', line 45

def size
  @mutex.synchronize do
    return @connections.size
  end
end

#to_sObject



254
255
256
257
258
259
260
261
262
# File 'lib/gene_pool.rb', line 254

def to_s
  conn = chk = with = nil
  @mutex.synchronize do
    conn = @connections.map{|c| c.object_id}.join(',')
    chk  = @checked_out.map{|c| c.object_id}.join(',')
    with = @with_map.keys.map{|k| "#{k}=#{@with_map[k].object_id}"}.join(',')
  end
  "connections=#{conn} checked_out=#{chk} with_map=#{with}"
end

#with_connectionObject

Create a scope for checking out a connection The client should handle cleanup on exception which should be something similar to the following:

rescue Exception => e
  @gene_pool.remove(connection)
  raise
end

Note that with_connection_auto_remove automatically does this



128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
# File 'lib/gene_pool.rb', line 128

def with_connection
  connection = checkout
  @mutex.synchronize do
    @with_map[connection.object_id] = connection
  end
  begin
    yield connection
  ensure
    @mutex.synchronize do
      # Update connection for any renew's that have occurred
      connection = @with_map.delete(connection.object_id)
    end
    checkin(connection) if connection
  end
end

#with_connection_auto_removeObject

Create a scope for checking out a connection while automatically removing this connection on exception



145
146
147
148
149
150
151
152
153
154
# File 'lib/gene_pool.rb', line 145

def with_connection_auto_remove
  with_connection do |connection|
    begin
      yield connection
    rescue Exception => e
      remove(connection)
      raise
    end
  end
end

#with_connection_auto_retryObject

Create a scope for checking out a connection while automatically retrying on exception



158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
# File 'lib/gene_pool.rb', line 158

def with_connection_auto_retry
  with_connection do |connection|
    begin
      yield connection
    rescue Exception => e
      if e.kind_of?(Timeout::Error) || e.message =~ /expired/
        remove(connection)
        raise
      end
      connection = renew(connection)
      begin
        yield connection
      rescue Exception => e
        remove(connection)
        raise
      end
    end
  end
end