Class: Que::ConnectionPool

Inherits:
Object
  • Object
show all
Defined in:
lib/que/connection_pool.rb

Instance Method Summary collapse

Constructor Details

#initialize(&block) ⇒ ConnectionPool

Returns a new instance of ConnectionPool.



8
9
10
11
12
13
# File 'lib/que/connection_pool.rb', line 8

def initialize(&block)
  @connection_proc = block
  @checked_out     = Set.new
  @mutex           = Mutex.new
  @thread_key      = "que_connection_pool_#{object_id}".to_sym
end

Instance Method Details

#checkoutObject



15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
# File 'lib/que/connection_pool.rb', line 15

def checkout
  # Do some asserting to ensure that the connection pool we're using is
  # behaving properly.
  @connection_proc.call do |conn|
    # Did this pool already have a connection for this thread?
    preexisting = wrapped = current_connection

    begin
      if preexisting
        # If so, check that the connection we just got is the one we expect.
        if preexisting.wrapped_connection.backend_pid != conn.backend_pid
          raise Error, "Connection pool is not reentrant! previous: #{preexisting.wrapped_connection.inspect} now: #{conn.inspect}"
        end
      else
        # If not, make sure that it wasn't promised to any other threads.
        sync do
          Que.assert(@checked_out.add?(conn.backend_pid)) do
            "Connection pool didn't synchronize access properly! (entrance: #{conn.backend_pid})"
          end
        end

        self.current_connection = wrapped = Connection.wrap(conn)
      end

      yield(wrapped)
    ensure
      if preexisting.nil?
        # We're at the top level (about to return this connection to the
        # pool we got it from), so mark it as no longer ours.
        self.current_connection = nil

        sync do
          Que.assert(@checked_out.delete?(conn.backend_pid)) do
            "Connection pool didn't synchronize access properly! (exit: #{conn.backend_pid})"
          end
        end
      end
    end
  end
end

#execute(*args) ⇒ Object



56
57
58
# File 'lib/que/connection_pool.rb', line 56

def execute(*args)
  checkout { |conn| conn.execute(*args) }
end

#in_transaction?Boolean

Returns:

  • (Boolean)


60
61
62
# File 'lib/que/connection_pool.rb', line 60

def in_transaction?
  checkout { |conn| conn.in_transaction? }
end