Class: HTTPX::Pool

Inherits:
Object
  • Object
show all
Defined in:
lib/httpx/pool.rb

Constant Summary collapse

POOL_TIMEOUT =
5

Instance Method Summary collapse

Constructor Details

#initialize(options) ⇒ Pool

Sets up the connection pool with the given options, which can be the following:

:max_connections_per_origin

the maximum number of connections held in the pool pointing to a given origin.

:pool_timeout

the number of seconds to wait for a connection to a given origin (before raising HTTPX::PoolTimeoutError)



19
20
21
22
23
24
25
26
27
28
# File 'lib/httpx/pool.rb', line 19

def initialize(options)
  @max_connections_per_origin = options.fetch(:max_connections_per_origin, Float::INFINITY)
  @pool_timeout = options.fetch(:pool_timeout, POOL_TIMEOUT)
  @resolvers = Hash.new { |hs, resolver_type| hs[resolver_type] = [] }
  @resolver_mtx = Thread::Mutex.new
  @connections = []
  @connection_mtx = Thread::Mutex.new
  @origin_counters = Hash.new(0)
  @origin_conds = Hash.new { |hs, orig| hs[orig] = ConditionVariable.new }
end

Instance Method Details

#checkin_connection(connection) ⇒ Object



61
62
63
64
65
66
67
68
69
# File 'lib/httpx/pool.rb', line 61

def checkin_connection(connection)
  return if connection.options.io

  @connection_mtx.synchronize do
    @connections << connection

    @origin_conds[connection.origin.to_s].signal
  end
end

#checkin_resolver(resolver) ⇒ Object



100
101
102
103
104
105
106
107
108
# File 'lib/httpx/pool.rb', line 100

def checkin_resolver(resolver)
  @resolver_mtx.synchronize do
    resolvers = @resolvers[resolver.class]

    resolver = resolver.multi

    resolvers << resolver unless resolvers.include?(resolver)
  end
end

#checkout_connection(uri, options) ⇒ Object

opens a connection to the IP reachable through uri. Many hostnames are reachable through the same IP, so we try to maximize pipelining by opening as few connections as possible.



42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
# File 'lib/httpx/pool.rb', line 42

def checkout_connection(uri, options)
  return checkout_new_connection(uri, options) if options.io

  @connection_mtx.synchronize do
    acquire_connection(uri, options) || begin
      if @origin_counters[uri.origin] == @max_connections_per_origin

        @origin_conds[uri.origin].wait(@connection_mtx, @pool_timeout)

        return acquire_connection(uri, options) || raise(PoolTimeoutError.new(uri.origin, @pool_timeout))
      end

      @origin_counters[uri.origin] += 1

      checkout_new_connection(uri, options)
    end
  end
end

#checkout_mergeable_connection(connection) ⇒ Object



71
72
73
74
75
76
77
78
79
80
# File 'lib/httpx/pool.rb', line 71

def checkout_mergeable_connection(connection)
  return if connection.options.io

  @connection_mtx.synchronize do
    idx = @connections.find_index do |ch|
      ch != connection && ch.mergeable?(connection)
    end
    @connections.delete_at(idx) if idx
  end
end

#checkout_resolver(options) ⇒ Object



86
87
88
89
90
91
92
93
94
95
96
97
98
# File 'lib/httpx/pool.rb', line 86

def checkout_resolver(options)
  resolver_type = options.resolver_class
  resolver_type = Resolver.resolver_for(resolver_type)

  @resolver_mtx.synchronize do
    resolvers = @resolvers[resolver_type]

    idx = resolvers.find_index do |res|
      res.options == options
    end
    resolvers.delete_at(idx) if idx
  end || checkout_new_resolver(resolver_type, options)
end

#pop_connectionObject



30
31
32
33
34
35
36
# File 'lib/httpx/pool.rb', line 30

def pop_connection
  @connection_mtx.synchronize do
    conn = @connections.shift
    @origin_conds.delete(conn.origin) if conn && (@origin_counters[conn.origin.to_s] -= 1).zero?
    conn
  end
end

#reset_resolversObject



82
83
84
# File 'lib/httpx/pool.rb', line 82

def reset_resolvers
  @resolver_mtx.synchronize { @resolvers.clear }
end