Class: FiberConnectionPool
- Inherits:
-
Object
- Object
- FiberConnectionPool
- Defined in:
- lib/fiber_connection_pool.rb
Constant Summary collapse
- VERSION =
'0.3.2'- RESERVED_TTL_SECS =
reserved cleanup trigger
30- SAVED_DATA_TTL_SECS =
saved_data cleanup trigger
30
Instance Attribute Summary collapse
-
#saved_data ⇒ Object
Returns the value of attribute saved_data.
-
#size ⇒ Object
readonly
Returns the value of attribute size.
-
#treated_exceptions ⇒ Object
Returns the value of attribute treated_exceptions.
Instance Method Summary collapse
-
#acquire(fiber = nil, opts = { :keep => true }) ⇒ Object
Acquire a lock on a connection and assign it to given fiber If no connection is available, yield the given fiber on the pending array.
-
#clear_save_data_requests ⇒ Object
Clear any save_data requests in the pool.
-
#gathered_data ⇒ Object
Return the gathered data for this fiber.
-
#has_connection?(conn) ⇒ Boolean
True if the given connection is anywhere inside the pool.
-
#initialize(opts) ⇒ FiberConnectionPool
constructor
Initializes the pool with ‘size’ instances running the given block to get each one.
-
#query(sql, *args) ⇒ Object
Avoid method_missing stack for ‘query’.
-
#recreate_connection(new_conn) ⇒ Object
DEPRECATED: use with_failed_connection.
-
#release(fiber = nil) ⇒ Object
Release connection assigned to the supplied fiber and resume any other pending connections (which will immediately try to run acquire on the pool) Also perform cleanup if TTL is past.
-
#release_data(fiber) ⇒ Object
Delete any saved_data for given fiber.
-
#reserved_cleanup ⇒ Object
Delete any reserved held for dead fibers.
-
#save_data(key, &block) ⇒ Object
Add a save_data request to the pool.
-
#save_data_cleanup ⇒ Object
Delete any saved_data held for dead fibers.
-
#with_failed_connection ⇒ Object
Identify the connection that just failed for current fiber.
Constructor Details
#initialize(opts) ⇒ FiberConnectionPool
Initializes the pool with ‘size’ instances running the given block to get each one. Ex:
pool = FiberConnectionPool.new(:size => 5) { MyConnection.new }
19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 |
# File 'lib/fiber_connection_pool.rb', line 19 def initialize(opts) raise ArgumentError.new('size > 0 is mandatory') if opts[:size].to_i <= 0 @size = opts[:size].to_i @saved_data = {} # placeholder for requested save data @reserved = {} # map of in-progress connections @treated_exceptions = [ PlaceholderException ] # list of Exception classes that need further connection treatment @last_reserved_cleanup = Time.now # reserved cleanup trigger @available = [] # pool of free connections @pending = [] # pending reservations (FIFO) @save_data_requests = {} # blocks to be yielded to save data @last_data_cleanup = Time.now # saved_data cleanup trigger @keep_connection = {} # keep reserved connections for fiber @available = Array.new(@size) { yield } end |
Dynamic Method Handling
This class handles dynamic methods through the method_missing method
#method_missing(method, *args, &blk) ⇒ Object (private)
Allow the pool to behave as the underlying connection
Yield the connection within execute method and release once it is complete (assumption: fiber will yield while waiting for IO, allowing the reactor run other fibers)
254 255 256 257 258 |
# File 'lib/fiber_connection_pool.rb', line 254 def method_missing(method, *args, &blk) execute(method, args) do |conn| conn.send(method, *args, &blk) end end |
Instance Attribute Details
#saved_data ⇒ Object
Returns the value of attribute saved_data.
10 11 12 |
# File 'lib/fiber_connection_pool.rb', line 10 def saved_data @saved_data end |
#size ⇒ Object (readonly)
Returns the value of attribute size.
12 13 14 |
# File 'lib/fiber_connection_pool.rb', line 12 def size @size end |
#treated_exceptions ⇒ Object
Returns the value of attribute treated_exceptions.
10 11 12 |
# File 'lib/fiber_connection_pool.rb', line 10 def treated_exceptions @treated_exceptions end |
Instance Method Details
#acquire(fiber = nil, opts = { :keep => true }) ⇒ Object
Acquire a lock on a connection and assign it to given fiber If no connection is available, yield the given fiber on the pending array
If :keep => true is given (by default), connection is kept, you must call ‘release’ at some point
Ex:
def transaction
@pool.acquire # reserve one instance for this fiber
@pool.query 'BEGIN' # start SQL transaction
yield # perform queries inside the transaction
@pool.query 'COMMIT' # confirm it
rescue => ex
@pool.query 'ROLLBACK' # discard it
raise ex
ensure
@pool.release # always release it back
end
166 167 168 169 170 171 172 173 174 175 176 177 |
# File 'lib/fiber_connection_pool.rb', line 166 def acquire(fiber = nil, opts = { :keep => true }) fiber = Fiber.current if fiber.nil? @keep_connection[fiber] = true if opts[:keep] return @reserved[fiber] if @reserved[fiber] # already reserved? then use it if conn = @available.pop @reserved[fiber] = conn conn else Fiber.yield @pending.push fiber acquire(fiber,opts) end end |
#clear_save_data_requests ⇒ Object
Clear any save_data requests in the pool. No data will be saved after this, unless new requests are added with #save_data.
71 72 73 |
# File 'lib/fiber_connection_pool.rb', line 71 def clear_save_data_requests @save_data_requests = {} end |
#gathered_data ⇒ Object
Return the gathered data for this fiber
64 65 66 |
# File 'lib/fiber_connection_pool.rb', line 64 def gathered_data @saved_data[Fiber.current] end |
#has_connection?(conn) ⇒ Boolean
True if the given connection is anywhere inside the pool
100 101 102 |
# File 'lib/fiber_connection_pool.rb', line 100 def has_connection?(conn) (@available + @reserved.values).include?(conn) end |
#query(sql, *args) ⇒ Object
Avoid method_missing stack for ‘query’
92 93 94 95 96 |
# File 'lib/fiber_connection_pool.rb', line 92 def query(sql, *args) execute('query', args) do |conn| conn.query sql, *args end end |
#recreate_connection(new_conn) ⇒ Object
DEPRECATED: use with_failed_connection
105 106 107 |
# File 'lib/fiber_connection_pool.rb', line 105 def recreate_connection(new_conn) with_failed_connection { new_conn } end |
#release(fiber = nil) ⇒ Object
Release connection assigned to the supplied fiber and resume any other pending connections (which will immediately try to run acquire on the pool) Also perform cleanup if TTL is past
184 185 186 187 188 189 190 191 192 193 194 |
# File 'lib/fiber_connection_pool.rb', line 184 def release(fiber = nil) fiber = Fiber.current if fiber.nil? @keep_connection.delete fiber @available.unshift @reserved.delete(fiber) # try cleanup reserved_cleanup if (Time.now - @last_reserved_cleanup) >= RESERVED_TTL_SECS notify_new_is_available end |
#release_data(fiber) ⇒ Object
Delete any saved_data for given fiber
77 78 79 |
# File 'lib/fiber_connection_pool.rb', line 77 def release_data(fiber) @saved_data.delete(fiber) end |
#reserved_cleanup ⇒ Object
Delete any reserved held for dead fibers
134 135 136 137 138 139 140 141 142 |
# File 'lib/fiber_connection_pool.rb', line 134 def reserved_cleanup @last_reserved_cleanup = Time.now @reserved.dup.each do |k,v| release(k) if not k.alive? end @keep_connection.dup.each do |k,v| @keep_connection.delete(k) if not k.alive? end end |
#save_data(key, &block) ⇒ Object
Add a save_data request to the pool. The given block will be executed after each successful call to -any- method on the connection. The connection and the method name are passed to the block.
The returned value will be saved in pool.gathered_data, and will be kept as long as the fiber stays alive.
Ex:
# (...right after pool's creation...)
pool.save_data(:hey_or_hoo) do |conn, method, args|
return 'hey' if method == 'query'
'hoo'
end
# (...from a reactor fiber...)
pool.query('select anything from anywhere')
puts pool.gathered_data[:hey_or_hoo]
=> 'hey'
58 59 60 |
# File 'lib/fiber_connection_pool.rb', line 58 def save_data(key, &block) @save_data_requests[key] = block end |
#save_data_cleanup ⇒ Object
Delete any saved_data held for dead fibers
83 84 85 86 87 88 |
# File 'lib/fiber_connection_pool.rb', line 83 def save_data_cleanup @saved_data.dup.each do |k,v| @saved_data.delete(k) if not k.alive? end @last_data_cleanup = Time.now end |
#with_failed_connection ⇒ Object
Identify the connection that just failed for current fiber. Pass it to the given block, which must return a valid instance of connection. After that, put the new connection into the pool in failed connection’s place. Raises NoReservedConnection if cannot find the failed connection instance.
114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 |
# File 'lib/fiber_connection_pool.rb', line 114 def with_failed_connection fiber = Fiber.current bad_conn = @reserved[fiber] raise NoReservedConnection.new if bad_conn.nil? new_conn = yield bad_conn @available.reject!{ |v| v == bad_conn } @reserved.reject!{ |k,v| v == bad_conn } # we should keep it if manually acquired, # just in case it is still useful if @keep_connection[fiber] then @reserved[fiber] = new_conn else @available.unshift new_conn # or else release into the pool notify_new_is_available end end |