Class: HotTub::Pool

Inherits:
Object
  • Object
show all
Includes:
KnownClients, Reaper::Mixin
Defined in:
lib/hot_tub/pool.rb

Constant Summary

Constants included from KnownClients

KnownClients::KNOWN_CLIENTS

Instance Attribute Summary collapse

Attributes included from Reaper::Mixin

#reap_timeout, #reaper, #shutdown

Instance Method Summary collapse

Methods included from Reaper::Mixin

#kill_reaper, #spawn_reaper

Methods included from KnownClients

#clean_client, #close_client, #reap_client?

Constructor Details

#initialize(opts = {}, &client_block) ⇒ Pool

Thread-safe lazy connection pool

Example Net::HTTP

pool = HotTub::Pool.new(:size => 10) {
  uri = URI.parse("http://somewebservice.com")
  http = Net::HTTP.new(uri.host, uri.port)
  http.start
  http
}
pool.run {|clnt| puts clnt.head('/').code }

Example Redis

# We don't want too many connections so we set our :max_size Under load our pool
# can grow to 30 connections. Once load dies down our pool can be reaped back down to 5
pool = HotTub::Pool.new(:size => 5, :max_size => 30, :reap_timeout => 60) { Redis.new }
pool.set('hot', 'stuff')
pool.get('hot')
# => 'stuff'

HotTub::Pool defaults never_block to true, which means if we run out of clients simply create a new client to continue operations. The pool will grow and extra clients will be reused until activity dies down. If you would like to block and possibly throw an exception rather than temporarily grow the set :size, set :never_block to false; wait_timeout defaults to 10 seconds.

Example with set pool size (will throw HotTub::Pool::Timeout exception)

pool = HotTub::Pool.new(:size => 1, :max_size => 1, :wait_timeout => 0.5) {
  uri = URI.parse("http://someslowwebservice.com")
  http = Net::HTTP.new(uri.host, uri.port)
  http.use_ssl = false
  http.start
  http
}
pool.run { |clnt| s clnt.head('/').code }

begin
  pool.run { |clnt| s clnt.head('/').code }
rescue HotTub::Pool::Timeout => e
  puts "Waited too long for a client: #{e}"
end

OPTIONS

:name

A string representing the name of your pool used for logging.

:size

Default is 5. An integer that sets the size of the pool. Could be describe as minimum size the pool should grow to.

:max_size

Default is 0. An integer that represents the maximum number of connections allowed when :non_blocking is true. If set to 0, which is the default, there is no limit; connections will continue to open until load subsides long enough for reaping to occur.

:wait_timeout

Default is 10 seconds. An integer that represents the timeout when waiting for a client from the pool in seconds. After said time a HotTub::Pool::Timeout exception will be thrown

:close

Default is nil. Can be a symbol representing an method to call on a client to close the client or a lambda that accepts the client as a parameter that will close a client. The close option is performed on clients on reaping and shutdown after the client has been removed from the pool. When nil, as is the default, no action is performed.

:clean

Default is nil. Can be a symbol representing an method to call on a client to clean the client or a lambda that accepts the client as a parameter that will clean a client. When nil, as is the default, no action is performed.

:reap?

Default is nil. Can be a symbol representing an method to call on a client that returns a boolean marking a client for reaping, or a lambda that accepts the client as a parameter that returns a boolean boolean marking a client for reaping. When nil, as is the default, no action is performed.

:reaper

If set to false prevents, a HotTub::Reaper from initializing and all reaping will occur when the clients are returned to the pool, blocking the current thread.

:reap_timeout

Default is 600 seconds. An integer that represents the timeout for reaping the pool in seconds.

:detect_fork

Set to false to disable fork detection

Raises:

  • (ArgumentError)


93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
# File 'lib/hot_tub/pool.rb', line 93

def initialize(opts={},&client_block)
  raise ArgumentError, 'a block that initializes a new client is required' unless block_given?
  @name             = (opts[:name] || self.class.name)
  @size             = (opts[:size] || 5)
  @wait_timeout     = (opts[:wait_timeout] || 10)   # in seconds
  @reap_timeout     = (opts[:reap_timeout] || 600)  # the interval to reap connections in seconds
  @max_size         = (opts[:max_size] || 0)        # maximum size of pool when non-blocking, 0 means no limit

  @close_client     = opts[:close]                  # => lambda {|clnt| clnt.close} or :close
  @clean_client     = opts[:clean]                  # => lambda {|clnt| clnt.clean} or :clean
  @reap_client      = opts[:reap?]                  # => lambda {|clnt| clnt.reap?} or :reap? # should return boolean
  @client_block     = client_block

  @_pool            = []    # stores available clients
  @_pool.taint
  @_out             = {}    # stores all checked out clients
  @_out.taint

  @mutex            = Mutex.new
  @cond             = ConditionVariable.new

  @shutdown         = false

  @sessions_key     = opts[:sessions_key]
  @blocking_reap    = (opts[:reaper] == false && !@sessions_key)
  @reaper           = ((@sessions_key || (opts[:reaper] == false)) ? false : spawn_reaper)

  @never_block      = (@max_size == 0)

  @pid              = Process.pid unless opts[:detect_fork] == false

  at_exit {shutdown!} unless @sessions_key
end

Instance Attribute Details

#nameObject

Returns the value of attribute name.



6
7
8
# File 'lib/hot_tub/pool.rb', line 6

def name
  @name
end

Instance Method Details

#clean!Object

Clean all clients currently checked into the pool. Its possible clients may be returned to the pool after cleaning



139
140
141
142
143
144
145
146
147
148
149
150
151
# File 'lib/hot_tub/pool.rb', line 139

def clean!
  HotTub.logger.info "[HotTub] Cleaning pool #{@name}!" if HotTub.logger
  @mutex.synchronize do
    begin
      @_pool.each do |clnt|
        clean_client(clnt)
      end
    ensure
      @cond.signal
    end
  end
  nil
end

#current_sizeObject



220
221
222
223
224
# File 'lib/hot_tub/pool.rb', line 220

def current_size
  @mutex.synchronize do
    _total_current_size
  end
end

#drain!Object Also known as: close!, reset!

Drain the pool of all clients currently checked into the pool. After draining, wake all sleeping threads to allow repopulating the pool or if shutdown allow threads to quickly finish their work Its possible clients may be returned to the pool after cleaning



157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
# File 'lib/hot_tub/pool.rb', line 157

def drain!
  HotTub.logger.info "[HotTub] Draining pool #{@name}!" if HotTub.logger
  @mutex.synchronize do
    begin
      while clnt = @_pool.pop
        close_client(clnt)
      end
    ensure
      @_out.clear
      @_pool.clear
      @pid = Process.pid
      @cond.broadcast
    end
  end
  nil
end

#forked?Boolean

Returns:

  • (Boolean)


233
234
235
# File 'lib/hot_tub/pool.rb', line 233

def forked?
  (@pid && (@pid != Process.pid))
end

#max_size=(max_size) ⇒ Object

We must reset our @never_block cache when we set max_size after initialization



228
229
230
231
# File 'lib/hot_tub/pool.rb', line 228

def max_size=max_size
  @never_block = (max_size == 0)
  @max_size = max_size
end

#reap!Object

Remove and close extra clients Releases mutex each iteration because reaping is a low priority action



189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
# File 'lib/hot_tub/pool.rb', line 189

def reap!
  HotTub.logger.info "[HotTub] Reaping pool #{@name}!" if HotTub.log_trace?
  while !@shutdown
    reaped = nil
    @mutex.synchronize do
      begin
        if _reap?
          if _dead_clients?
            reaped = @_out.select { |clnt, thrd| !thrd.alive? }.keys
            @_out.delete_if { |k,v| reaped.include? k }
          else
            reaped = [@_pool.shift]
          end
        else
          reaped = nil
        end
      ensure
        @cond.signal
      end
    end
    if reaped
      reaped.each do |clnt|
        close_client(clnt)
      end
    else
      break
    end
  end
  nil
end

#runObject

Preform an operations with a client/connection. Requires a block that receives the client.



129
130
131
132
133
134
135
# File 'lib/hot_tub/pool.rb', line 129

def run
  drain! if forked?
  clnt = pop
  yield clnt
ensure
  push(clnt)
end

#shutdown!Object

Kills the reaper and drains the pool.



177
178
179
180
181
182
183
184
# File 'lib/hot_tub/pool.rb', line 177

def shutdown!
  HotTub.logger.info "[HotTub] Shutting down pool #{@name}!" if HotTub.logger
  @shutdown = true
  kill_reaper if @reaper
  drain!
  @shutdown = false
  nil
end