Class: FiberConnectionPool

Inherits:
Object
  • Object
show all
Defined in:
lib/fiber_connection_pool.rb

Constant Summary collapse

VERSION =
'0.3.2'
RESERVED_TTL_SECS =

reserved cleanup trigger

30
SAVED_DATA_TTL_SECS =

saved_data cleanup trigger

30

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(opts) ⇒ FiberConnectionPool

Initializes the pool with ‘size’ instances running the given block to get each one. Ex:

pool = FiberConnectionPool.new(:size => 5) { MyConnection.new }

Raises:

  • (ArgumentError)


19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
# File 'lib/fiber_connection_pool.rb', line 19

def initialize(opts)
  raise ArgumentError.new('size > 0 is mandatory') if opts[:size].to_i <= 0

  @size = opts[:size].to_i

  @saved_data = {} # placeholder for requested save data
  @reserved  = {}   # map of in-progress connections
  @treated_exceptions = [ PlaceholderException ]  # list of Exception classes that need further connection treatment
  @last_reserved_cleanup = Time.now # reserved cleanup trigger
  @available = []   # pool of free connections
  @pending   = []   # pending reservations (FIFO)
  @save_data_requests = {} # blocks to be yielded to save data
  @last_data_cleanup = Time.now # saved_data cleanup trigger
  @keep_connection = {} # keep reserved connections for fiber

  @available = Array.new(@size) { yield }
end

Dynamic Method Handling

This class handles dynamic methods through the method_missing method

#method_missing(method, *args, &blk) ⇒ Object (private)

Allow the pool to behave as the underlying connection

Yield the connection within execute method and release once it is complete (assumption: fiber will yield while waiting for IO, allowing the reactor run other fibers)



254
255
256
257
258
# File 'lib/fiber_connection_pool.rb', line 254

def method_missing(method, *args, &blk)
  execute(method, args) do |conn|
    conn.send(method, *args, &blk)
  end
end

Instance Attribute Details

#saved_dataObject

Returns the value of attribute saved_data.



10
11
12
# File 'lib/fiber_connection_pool.rb', line 10

def saved_data
  @saved_data
end

#sizeObject (readonly)

Returns the value of attribute size.



12
13
14
# File 'lib/fiber_connection_pool.rb', line 12

def size
  @size
end

#treated_exceptionsObject

Returns the value of attribute treated_exceptions.



10
11
12
# File 'lib/fiber_connection_pool.rb', line 10

def treated_exceptions
  @treated_exceptions
end

Instance Method Details

#acquire(fiber = nil, opts = { :keep => true }) ⇒ Object

Acquire a lock on a connection and assign it to given fiber If no connection is available, yield the given fiber on the pending array

If :keep => true is given (by default), connection is kept, you must call ‘release’ at some point

Ex:

def transaction
  @pool.acquire          # reserve one instance for this fiber
  @pool.query 'BEGIN'    # start SQL transaction

  yield                  # perform queries inside the transaction

  @pool.query 'COMMIT'   # confirm it
rescue => ex
  @pool.query 'ROLLBACK' # discard it
  raise ex
ensure
  @pool.release          # always release it back
end


166
167
168
169
170
171
172
173
174
175
176
177
# File 'lib/fiber_connection_pool.rb', line 166

def acquire(fiber = nil, opts = { :keep => true })
  fiber = Fiber.current if fiber.nil?
  @keep_connection[fiber] = true if opts[:keep]
  return @reserved[fiber] if @reserved[fiber] # already reserved? then use it
  if conn = @available.pop
    @reserved[fiber] = conn
    conn
  else
    Fiber.yield @pending.push fiber
    acquire(fiber,opts)
  end
end

#clear_save_data_requestsObject

Clear any save_data requests in the pool. No data will be saved after this, unless new requests are added with #save_data.



71
72
73
# File 'lib/fiber_connection_pool.rb', line 71

def clear_save_data_requests
  @save_data_requests = {}
end

#gathered_dataObject

Return the gathered data for this fiber



64
65
66
# File 'lib/fiber_connection_pool.rb', line 64

def gathered_data
  @saved_data[Fiber.current]
end

#has_connection?(conn) ⇒ Boolean

True if the given connection is anywhere inside the pool

Returns:

  • (Boolean)


100
101
102
# File 'lib/fiber_connection_pool.rb', line 100

def has_connection?(conn)
  (@available + @reserved.values).include?(conn)
end

#query(sql, *args) ⇒ Object

Avoid method_missing stack for ‘query’



92
93
94
95
96
# File 'lib/fiber_connection_pool.rb', line 92

def query(sql, *args)
  execute('query', args) do |conn|
    conn.query sql, *args
  end
end

#recreate_connection(new_conn) ⇒ Object

DEPRECATED: use with_failed_connection



105
106
107
# File 'lib/fiber_connection_pool.rb', line 105

def recreate_connection(new_conn)
  with_failed_connection { new_conn }
end

#release(fiber = nil) ⇒ Object

Release connection assigned to the supplied fiber and resume any other pending connections (which will immediately try to run acquire on the pool) Also perform cleanup if TTL is past



184
185
186
187
188
189
190
191
192
193
194
# File 'lib/fiber_connection_pool.rb', line 184

def release(fiber = nil)
  fiber = Fiber.current if fiber.nil?
  @keep_connection.delete fiber

  @available.unshift @reserved.delete(fiber)

  # try cleanup
  reserved_cleanup if (Time.now - @last_reserved_cleanup) >= RESERVED_TTL_SECS

  notify_new_is_available
end

#release_data(fiber) ⇒ Object

Delete any saved_data for given fiber



77
78
79
# File 'lib/fiber_connection_pool.rb', line 77

def release_data(fiber)
  @saved_data.delete(fiber)
end

#reserved_cleanupObject

Delete any reserved held for dead fibers



134
135
136
137
138
139
140
141
142
# File 'lib/fiber_connection_pool.rb', line 134

def reserved_cleanup
  @last_reserved_cleanup = Time.now
  @reserved.dup.each do |k,v|
    release(k) if not k.alive?
  end
  @keep_connection.dup.each do |k,v|
    @keep_connection.delete(k) if not k.alive?
  end
end

#save_data(key, &block) ⇒ Object

Add a save_data request to the pool. The given block will be executed after each successful call to -any- method on the connection. The connection and the method name are passed to the block.

The returned value will be saved in pool.gathered_data, and will be kept as long as the fiber stays alive.

Ex:

# (...right after pool's creation...)
pool.save_data(:hey_or_hoo) do |conn, method, args|
  return 'hey' if method == 'query'
  'hoo'
end

# (...from a reactor fiber...)
pool.query('select anything from anywhere')
puts pool.gathered_data[:hey_or_hoo]
  => 'hey'


58
59
60
# File 'lib/fiber_connection_pool.rb', line 58

def save_data(key, &block)
  @save_data_requests[key] = block
end

#save_data_cleanupObject

Delete any saved_data held for dead fibers



83
84
85
86
87
88
# File 'lib/fiber_connection_pool.rb', line 83

def save_data_cleanup
  @saved_data.dup.each do |k,v|
    @saved_data.delete(k) if not k.alive?
  end
  @last_data_cleanup = Time.now
end

#with_failed_connectionObject

Identify the connection that just failed for current fiber. Pass it to the given block, which must return a valid instance of connection. After that, put the new connection into the pool in failed connection’s place. Raises NoReservedConnection if cannot find the failed connection instance.



114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
# File 'lib/fiber_connection_pool.rb', line 114

def with_failed_connection
  fiber = Fiber.current
  bad_conn = @reserved[fiber]
  raise NoReservedConnection.new if bad_conn.nil?
  new_conn = yield bad_conn
  @available.reject!{ |v| v == bad_conn }
  @reserved.reject!{ |k,v| v == bad_conn }

  # we should keep it if manually acquired,
  # just in case it is still useful
  if @keep_connection[fiber] then
    @reserved[fiber] = new_conn
  else
    @available.unshift new_conn # or else release into the pool
    notify_new_is_available
  end
end