Module: Rage::Ext::ActiveRecord::ConnectionPool

Defined in:
lib/rage/ext/active_record/connection_pool.rb

Defined Under Namespace

Classes: BlackHoleList

Instance Method Summary collapse

Instance Method Details

#__init_rage_extensionObject



34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
# File 'lib/rage/ext/active_record/connection_pool.rb', line 34

def __init_rage_extension
  # a map of fibers that are currently waiting for a
  # connection in the format of { Fiber => timestamp }
  @__blocked = {}

  # a map of fibers that are currently hodling connections
  # in the format of { Fiber => Connection }
  @__in_use = {}

  # a list of all DB connections that are currently idle
  @__connections = build_new_connections

  # how long a fiber can wait for a connection to become available
  @__checkout_timeout = checkout_timeout

  # how long a connection can be idle for before disconnecting
  @__idle_timeout = reaper.frequency

  # how often should we check for fibers that wait for a connection for too long
  @__timeout_worker_frequency = 0.5

  # reject fibers that wait for a connection for more than `@__checkout_timeout`
  Iodine.run_every((@__timeout_worker_frequency * 1_000).to_i) do
    if @__blocked.length > 0
      current_time = Process.clock_gettime(Process::CLOCK_MONOTONIC)
      @__blocked.each do |fiber, blocked_since|
        if (current_time - blocked_since) > @__checkout_timeout
          @__blocked.delete(fiber)
          fiber.raise(ActiveRecord::ConnectionTimeoutError, "could not obtain a connection from the pool within #{@__checkout_timeout} seconds; all pooled connections were in use")
        end
      end
    end
  end

  # resume blocked fibers once connections become available
  Iodine.subscribe("ext:ar-connection-released") do
    if @__blocked.length > 0 && @__connections.length > 0
      f, _ = @__blocked.shift
      f.resume
    end
  end

  # unsubscribe on shutdown
  Iodine.on_state(:on_finish) do
    Iodine.unsubscribe("ext:ar-connection-released")
  end
end

#active_connection?Boolean

Returns true if there is an open connection being used for the current fiber.

Returns:

  • (Boolean)


83
84
85
# File 'lib/rage/ext/active_record/connection_pool.rb', line 83

def active_connection?
  @__in_use[Fiber.current]
end

#checkin(conn) ⇒ Object

Check in a database connection back into the pool, indicating that you no longer need this connection.



234
235
236
237
# File 'lib/rage/ext/active_record/connection_pool.rb', line 234

def checkin(conn)
  fiber = @__in_use.key(conn)
  release_connection(fiber)
end

#checkout(_ = nil) ⇒ Object

Check out a database connection from the pool, indicating that you want to use it. You should call #checkin when you no longer need this.



229
230
231
# File 'lib/rage/ext/active_record/connection_pool.rb', line 229

def checkout(_ = nil)
  connection
end

#clear_reloadable_connections(raise_on_acquisition_timeout = true) ⇒ Object



247
248
249
# File 'lib/rage/ext/active_record/connection_pool.rb', line 247

def clear_reloadable_connections(raise_on_acquisition_timeout = true)
  disconnect(raise_on_acquisition_timeout)
end

#clear_reloadable_connections!Object



251
252
253
# File 'lib/rage/ext/active_record/connection_pool.rb', line 251

def clear_reloadable_connections!
  disconnect(false)
end

#connected?Boolean

Returns true if a connection has already been opened.

Returns:

  • (Boolean)


163
164
165
# File 'lib/rage/ext/active_record/connection_pool.rb', line 163

def connected?
  true
end

#connectionObject

Retrieve the connection associated with the current fiber, or obtain one if necessary.



88
89
90
91
92
93
94
95
96
# File 'lib/rage/ext/active_record/connection_pool.rb', line 88

def connection
  @__in_use[Fiber.current] ||= @__connections.shift || begin
    fiber, blocked_since = Fiber.current, Process.clock_gettime(Process::CLOCK_MONOTONIC)
    @__blocked[fiber] = blocked_since
    Fiber.yield

    @__connections.shift
  end
end

#connectionsObject

Returns an array containing the connections currently in the pool.



158
159
160
# File 'lib/rage/ext/active_record/connection_pool.rb', line 158

def connections
  @__connections.to_a
end

#discard!Object

Discards all connections in the pool (even if they’re currently in use!), along with the pool itself. Any further interaction with the pool is undefined.



261
262
263
264
# File 'lib/rage/ext/active_record/connection_pool.rb', line 261

def discard!
  @__discarded = true
  (@__connections + @__in_use.values).each { |conn| conn.discard! }
end

#discarded?Boolean

Returns:

  • (Boolean)


266
267
268
# File 'lib/rage/ext/active_record/connection_pool.rb', line 266

def discarded?
  !!@__discarded
end

#disconnect(raise_on_acquisition_timeout = true, disconnect_attempts = 0) ⇒ Object

Disconnects all connections in the pool, and clears the pool. Raises ActiveRecord::ExclusiveConnectionTimeoutError if unable to gain ownership of all connections in the pool within a timeout interval (default duration is checkout_timeout * 2 seconds).



183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
# File 'lib/rage/ext/active_record/connection_pool.rb', line 183

def disconnect(raise_on_acquisition_timeout = true, disconnect_attempts = 0)
  # allow request fibers to release connections, but block from acquiring new ones
  if disconnect_attempts == 0
    @__connections = BlackHoleList.new(@__connections)
  end

  # if some connections are in use, we will wait for up to `@__checkout_timeout * 2` seconds
  if @__in_use.length > 0 && disconnect_attempts <= @__checkout_timeout * 4
    Iodine.run_after(500) { disconnect(raise_on_acquisition_timeout, disconnect_attempts + 1) }
    return
  end

  pool_connections = @__connections.to_a

  # check if there are still some connections in use
  if @__in_use.length > 0
    raise(ActiveRecord::ExclusiveConnectionTimeoutError, "could not obtain ownership of all database connections") if raise_on_acquisition_timeout
    pool_connections += @__in_use.values
    @__in_use.clear
  end

  # disconnect all connections
  pool_connections.each do |conn|
    conn.disconnect!
    __remove__(conn)
  end

  # create a new pool
  @__connections = build_new_connections

  # notify blocked fibers that there are new connections available
  [@__blocked.length, @__connections.length].min.times do
    Iodine.publish("ext:ar-connection-released", "", Iodine::PubSub::PROCESS)
  end
end

#disconnect!Object

Disconnects all connections in the pool, and clears the pool. The pool first tries to gain ownership of all connections. If unable to do so within a timeout interval (default duration is checkout_timeout * 2 seconds), then the pool is forcefully disconnected without any regard for other connection owning fibers.



223
224
225
# File 'lib/rage/ext/active_record/connection_pool.rb', line 223

def disconnect!
  disconnect(false)
end

#flush(minimum_idle = @__idle_timeout) ⇒ Object

Disconnect all connections that have been idle for at least minimum_idle seconds. Connections currently checked out, or that were checked in less than minimum_idle seconds ago, are unaffected.



130
131
132
133
134
135
136
137
138
139
140
141
142
# File 'lib/rage/ext/active_record/connection_pool.rb', line 130

def flush(minimum_idle = @__idle_timeout)
  return if minimum_idle.nil? || @__connections.length == 0

  current_time, i = Process.clock_gettime(Process::CLOCK_MONOTONIC), 0
  while i < @__connections.length
    conn = @__connections[i]
    if conn.__idle_since && current_time - conn.__idle_since >= minimum_idle
      conn.__idle_since = nil
      conn.disconnect!
    end
    i += 1
  end
end

#flush!Object

Disconnect all currently idle connections. Connections currently checked out are unaffected.



145
146
147
148
# File 'lib/rage/ext/active_record/connection_pool.rb', line 145

def flush!
  reap
  flush(-1)
end

#num_waiting_in_queueObject



255
256
257
# File 'lib/rage/ext/active_record/connection_pool.rb', line 255

def num_waiting_in_queue
  @__blocked.length
end

#reapObject

Recover lost connections for the pool.



110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
# File 'lib/rage/ext/active_record/connection_pool.rb', line 110

def reap
  @__in_use.each do |fiber, conn|
    unless fiber.alive?
      if conn.active?
        conn.reset!
        release_connection(fiber)
      else
        @__in_use.delete(fiber)
        conn.disconnect!
        __remove__(conn)
        @__connections += build_new_connections(1)
        Iodine.publish("ext:ar-connection-released", "", Iodine::PubSub::PROCESS) if @__blocked.length > 0
      end
    end
  end
end

#release_connection(owner = Fiber.current) ⇒ Object

Signal that the fiber is finished with the current connection and it can be returned to the pool.



99
100
101
102
103
104
105
106
107
# File 'lib/rage/ext/active_record/connection_pool.rb', line 99

def release_connection(owner = Fiber.current)
  if conn = @__in_use.delete(owner)
    conn.__idle_since = Process.clock_gettime(Process::CLOCK_MONOTONIC)
    @__connections << conn
    Iodine.publish("ext:ar-connection-released", "", Iodine::PubSub::PROCESS) if @__blocked.length > 0
  end

  conn
end

#remove(conn) ⇒ Object

Remove a connection from the connection pool. The connection will remain open and active but will no longer be managed by this pool.



241
242
243
244
245
# File 'lib/rage/ext/active_record/connection_pool.rb', line 241

def remove(conn)
  __remove__(conn)
  @__in_use.delete_if { |_, c| c == conn }
  @__connections.delete(conn)
end

#statObject

Return connection pool’s usage statistic.



168
169
170
171
172
173
174
175
176
177
178
# File 'lib/rage/ext/active_record/connection_pool.rb', line 168

def stat
  {
    size: size,
    connections: size,
    busy: @__in_use.count { |fiber, _| fiber.alive? },
    dead: @__in_use.count { |fiber, _| !fiber.alive? },
    idle: @__connections.length,
    waiting: @__blocked.length,
    checkout_timeout: @__checkout_timeout
  }
end

#with_connectionObject

Yields a connection from the connection pool to the block.



151
152
153
154
155
# File 'lib/rage/ext/active_record/connection_pool.rb', line 151

def with_connection
  yield connection
ensure
  release_connection
end