Class: ConnectionQueue

Inherits:
Object
  • Object
show all
Defined in:
lib/redis_pool/connection_queue.rb

Overview

A thread-safe implementation of a connection queue. Supports adding, removing, and polling a connection synchronously, and doesn’t create more than ‘max_size` elements. All connections are created lazily (only when needed).

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(max_size = 0, &block) ⇒ ConnectionQueue

Returns a new instance of ConnectionQueue.



14
15
16
17
18
19
20
21
# File 'lib/redis_pool/connection_queue.rb', line 14

def initialize(max_size = 0, &block)
  @create_block = block
  @created = 0
  @queue = []
  @max_size = max_size
  @lock = Monitor.new
  @lock_cond = @lock.new_cond
end

Instance Attribute Details

#max_sizeObject (readonly)

Returns the value of attribute max_size.



12
13
14
# File 'lib/redis_pool/connection_queue.rb', line 12

def max_size
  @max_size
end

#queueObject (readonly)

Returns the value of attribute queue.



12
13
14
# File 'lib/redis_pool/connection_queue.rb', line 12

def queue
  @queue
end

Instance Method Details

#add(element) ⇒ Object Also known as: <<, push

Adds (or returns) a connection to the available queue, synchronously.



26
27
28
29
30
31
# File 'lib/redis_pool/connection_queue.rb', line 26

def add(element)
  synchronize do
    @queue.push element
    @lock_cond.signal
  end
end

#available_to_createObject

Returns the number of available connections to create.



82
83
84
# File 'lib/redis_pool/connection_queue.rb', line 82

def available_to_create
  @max_size - @created
end

#delete(element) ⇒ Object

Removes an idle connection from the queue synchronously.



63
64
65
66
67
# File 'lib/redis_pool/connection_queue.rb', line 63

def delete(element)
  synchronize do
    @queue.delete element
  end
end

#poll(timeout = 5) ⇒ Object Also known as: pop

Fetches any available connection from the queue. If a connection is not available, waits for timeout until a connection is available or raises a TimeoutError.



40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
# File 'lib/redis_pool/connection_queue.rb', line 40

def poll(timeout = 5)
  t0 = Concurrent.monotonic_time
  elapsed = 0
  synchronize do
    loop do
      return get_connection if connection_available?

      connection = create_connection
      return connection if connection

      elapsed = Concurrent.monotonic_time - t0
      raise TimeoutError, 'could not obtain connection' if elapsed >= timeout

      @lock_cond.wait(timeout - elapsed)
    end
  end
end

#total_availableObject

Returns the total available connections to be used. This takes into account the number of connections that can be created as well. So it is all connections that can be used AND created.



75
76
77
# File 'lib/redis_pool/connection_queue.rb', line 75

def total_available
  @max_size - @created + @queue.length
end