Class: Async::Pool::Controller
- Inherits:
-
Object
- Object
- Async::Pool::Controller
- Defined in:
- lib/async/pool/controller.rb
Overview
A resource pool controller.
Instance Attribute Summary collapse
- #all allocated resources, and their associated usage.(allocatedresources) ⇒ Object readonly
-
#constructor ⇒ Object
readonly
Returns the value of attribute constructor.
-
#limit ⇒ Object
Returns the value of attribute limit.
-
#policy ⇒ Object
Returns the value of attribute policy.
-
#resources ⇒ Object
readonly
Returns the value of attribute resources.
-
#tags ⇒ Object
Returns the value of attribute tags.
- #The constructor used to create new resources.(constructorusedtocreatenewresources.) ⇒ Object readonly
- #The maximum number of concurrent tasks that can be creating a new resource.(maximumnumberofconcurrenttasksthatcanbecreatinganewresource.) ⇒ Object readonly
- #The maximum number of resources that this pool can have at any given time.(maximumnumberofresourcesthatthispoolcanhaveatanygiventime.) ⇒ Object readonly
- #The name of the pool.(nameofthepool.) ⇒ Object readonly
Class Method Summary collapse
-
.wrap(**options, &block) ⇒ Object
Create a new resource pool, using the given block to create new resources.
Instance Method Summary collapse
-
#acquire ⇒ Object
Acquire a resource from the pool.
-
#active? ⇒ Boolean
Whether the pool has any active resources.
-
#as_json ⇒ Object
Generate a JSON representation of the pool.
-
#available? ⇒ Boolean
Whether there are available resources, i.e.
-
#busy? ⇒ Boolean
Whether there are resources which are currently in use.
-
#close ⇒ Object
Drain the pool, clear all resources, and stop the gardener.
- #concurrency ⇒ Object
-
#concurrency=(value) ⇒ Object
Set the maximum number of concurrent tasks that can be creating a new resource.
-
#drain ⇒ Object
Drain the pool, closing all resources.
-
#empty? ⇒ Boolean
Whether the pool is empty.
-
#initialize(constructor, limit: nil, concurrency: (limit || 1), policy: nil, tags: nil) ⇒ Controller
constructor
Create a new resource pool.
-
#prune(retain = 0) ⇒ Object
Retire (and close) all unused resources.
-
#release(resource) ⇒ Object
Make the resource resources and let waiting tasks know that there is something resources.
-
#retire(resource) ⇒ Object
Retire a specific resource.
-
#size ⇒ Object
The number of resources in the pool.
- #The pool policy.=(poolpolicy. = (value)) ⇒ Object
-
#to_json ⇒ Object
Generate a JSON representation of the pool.
-
#to_s ⇒ Object
Generate a human-readable representation of the pool.
-
#wait ⇒ Object
Wait until a pool resource has been freed.
Constructor Details
#initialize(constructor, limit: nil, concurrency: (limit || 1), policy: nil, tags: nil) ⇒ Controller
Create a new resource pool.
29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 |
# File 'lib/async/pool/controller.rb', line 29 def initialize(constructor, limit: nil, concurrency: (limit || 1), policy: nil, tags: nil) @constructor = constructor @limit = limit # This semaphore is used to limit the number of concurrent tasks which are creating new resources. @guard = Async::Semaphore.new(concurrency) @policy = policy @gardener = nil @tags = # All available resources: @resources = {} # Resources which may be available to be acquired: # This list may contain false positives, or resources which were okay but have since entered a state which is unusuable. @available = [] # Used to signal when a resource has been released: @notification = Async::Notification.new end |
Instance Attribute Details
#all allocated resources, and their associated usage.(allocatedresources) ⇒ Object (readonly)
96 |
# File 'lib/async/pool/controller.rb', line 96 attr :resources |
#constructor ⇒ Object (readonly)
Returns the value of attribute constructor.
53 54 55 |
# File 'lib/async/pool/controller.rb', line 53 def constructor @constructor end |
#limit ⇒ Object
Returns the value of attribute limit.
56 57 58 |
# File 'lib/async/pool/controller.rb', line 56 def limit @limit end |
#policy ⇒ Object
Returns the value of attribute policy.
93 94 95 |
# File 'lib/async/pool/controller.rb', line 93 def policy @policy end |
#resources ⇒ Object (readonly)
Returns the value of attribute resources.
96 97 98 |
# File 'lib/async/pool/controller.rb', line 96 def resources @resources end |
#tags ⇒ Object
Returns the value of attribute tags.
99 100 101 |
# File 'lib/async/pool/controller.rb', line 99 def @tags end |
#The constructor used to create new resources.(constructorusedtocreatenewresources.) ⇒ Object (readonly)
53 |
# File 'lib/async/pool/controller.rb', line 53 attr :constructor |
#The maximum number of concurrent tasks that can be creating a new resource.(maximumnumberofconcurrenttasksthatcanbecreatinganewresource.) ⇒ Object (readonly)
83 84 85 |
# File 'lib/async/pool/controller.rb', line 83 def concurrency @guard.limit end |
#The maximum number of resources that this pool can have at any given time.(maximumnumberofresourcesthatthispoolcanhaveatanygiventime.) ⇒ Object (readonly)
56 |
# File 'lib/async/pool/controller.rb', line 56 attr_accessor :limit |
#The name of the pool.(nameofthepool.) ⇒ Object (readonly)
99 |
# File 'lib/async/pool/controller.rb', line 99 attr_accessor :tags |
Class Method Details
.wrap(**options, &block) ⇒ Object
Create a new resource pool, using the given block to create new resources.
19 20 21 |
# File 'lib/async/pool/controller.rb', line 19 def self.wrap(**, &block) self.new(block, **) end |
Instance Method Details
#acquire ⇒ Object
Acquire a resource from the pool. If a block is provided, the resource will be released after the block has been executed.
136 137 138 139 140 141 142 143 144 145 146 |
# File 'lib/async/pool/controller.rb', line 136 def acquire resource = wait_for_resource return resource unless block_given? begin yield resource ensure release(resource) end end |
#active? ⇒ Boolean
Whether the pool has any active resources.
107 108 109 |
# File 'lib/async/pool/controller.rb', line 107 def active? !@resources.empty? end |
#as_json ⇒ Object
Generate a JSON representation of the pool.
68 69 70 71 72 73 74 75 |
# File 'lib/async/pool/controller.rb', line 68 def as_json(...) { limit: @limit, concurrency: @guard.limit, usage: @resources.size, availability_summary: self.availability_summary, } end |
#available? ⇒ Boolean
Whether there are available resources, i.e. whether #acquire can reuse an existing resource.
121 122 123 |
# File 'lib/async/pool/controller.rb', line 121 def available? @available.any? end |
#busy? ⇒ Boolean
Whether there are resources which are currently in use.
112 113 114 115 116 117 118 |
# File 'lib/async/pool/controller.rb', line 112 def busy? @resources.collect do |_, usage| return true if usage > 0 end return false end |
#close ⇒ Object
Drain the pool, clear all resources, and stop the gardener.
173 174 175 176 177 178 |
# File 'lib/async/pool/controller.rb', line 173 def close self.drain @available.clear @gardener&.stop end |
#concurrency ⇒ Object
83 84 85 |
# File 'lib/async/pool/controller.rb', line 83 def concurrency @guard.limit end |
#concurrency=(value) ⇒ Object
Set the maximum number of concurrent tasks that can be creating a new resource.
88 89 90 |
# File 'lib/async/pool/controller.rb', line 88 def concurrency= value @guard.limit = value end |
#drain ⇒ Object
Drain the pool, closing all resources.
163 164 165 166 167 168 169 170 |
# File 'lib/async/pool/controller.rb', line 163 def drain Console.debug(self, "Draining pool...", size: @resources.size) # Enumerate all existing resources and retire them: while resource = acquire_existing_resource retire(resource) end end |
#empty? ⇒ Boolean
Whether the pool is empty.
131 132 133 |
# File 'lib/async/pool/controller.rb', line 131 def empty? @resources.empty? end |
#prune(retain = 0) ⇒ Object
Retire (and close) all unused resources. If a block is provided, it should implement the desired functionality for unused resources.
183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 |
# File 'lib/async/pool/controller.rb', line 183 def prune(retain = 0) unused = [] # This code must not context switch: @resources.each do |resource, usage| if usage.zero? unused << resource end end # It's okay for this to context switch: unused.each do |resource| if block_given? yield resource else retire(resource) end break if @resources.size <= retain end # Update availability list: @available.clear @resources.each do |resource, usage| if usage < resource.concurrency and resource.reusable? @available << resource end end return unused.size end |
#release(resource) ⇒ Object
Make the resource resources and let waiting tasks know that there is something resources.
149 150 151 152 153 154 155 156 157 158 159 160 |
# File 'lib/async/pool/controller.rb', line 149 def release(resource) processed = false # A resource that is not good should also not be reusable. if resource.reusable? processed = reuse(resource) end # @policy.released(self, resource) ensure retire(resource) unless processed end |
#retire(resource) ⇒ Object
Retire a specific resource.
216 217 218 219 220 221 222 223 224 225 226 |
# File 'lib/async/pool/controller.rb', line 216 def retire(resource) Console.debug(self) {"Retire #{resource}"} @resources.delete(resource) resource.close @notification.signal return true end |
#size ⇒ Object
The number of resources in the pool.
102 103 104 |
# File 'lib/async/pool/controller.rb', line 102 def size @resources.size end |
#The pool policy.=(poolpolicy. = (value)) ⇒ Object
93 |
# File 'lib/async/pool/controller.rb', line 93 attr_accessor :policy |
#to_json ⇒ Object
Generate a JSON representation of the pool.
78 79 80 |
# File 'lib/async/pool/controller.rb', line 78 def to_json(...) as_json.to_json(...) end |
#to_s ⇒ Object
Generate a human-readable representation of the pool.
59 60 61 62 63 64 65 |
# File 'lib/async/pool/controller.rb', line 59 def to_s if @resources.empty? "\#<#{self.class}(#{usage_string})>" else "\#<#{self.class}(#{usage_string}) #{availability_summary.join(';')}>" end end |
#wait ⇒ Object
Wait until a pool resource has been freed.
126 127 128 |
# File 'lib/async/pool/controller.rb', line 126 def wait @notification.wait end |