Class: Async::Pool::Controller
- Inherits:
-
Object
- Object
- Async::Pool::Controller
- Defined in:
- lib/async/pool/controller.rb
Overview
A resource pool controller.
Instance Attribute Summary collapse
- #all allocated resources, and their associated usage.(allocatedresources) ⇒ Object readonly
-
#constructor ⇒ Object
readonly
Returns the value of attribute constructor.
-
#limit ⇒ Object
Returns the value of attribute limit.
-
#policy ⇒ Object
Returns the value of attribute policy.
-
#resources ⇒ Object
readonly
Returns the value of attribute resources.
-
#tags ⇒ Object
Returns the value of attribute tags.
- #The constructor used to create new resources.(constructorusedtocreatenewresources.) ⇒ Object readonly
- #The maximum number of concurrent tasks that can be creating a new resource.(maximumnumberofconcurrenttasksthatcanbecreatinganewresource.) ⇒ Object readonly
- #The maximum number of resources that this pool can have at any given time.(maximumnumberofresourcesthatthispoolcanhaveatanygiventime.) ⇒ Object readonly
- #The name of the pool.(nameofthepool.) ⇒ Object readonly
Class Method Summary collapse
-
.wrap(**options, &block) ⇒ Object
Create a new resource pool, using the given block to create new resources.
Instance Method Summary collapse
-
#acquire ⇒ Object
Acquire a resource from the pool.
-
#active? ⇒ Boolean
Whether the pool has any active resources.
-
#as_json ⇒ Object
Generate a JSON representation of the pool.
-
#available? ⇒ Boolean
Whether there are available resources, i.e.
-
#busy? ⇒ Boolean
Whether there are resources which are currently in use.
-
#close ⇒ Object
Close all resources in the pool.
- #concurrency ⇒ Object
-
#concurrency=(value) ⇒ Object
Set the maximum number of concurrent tasks that can be creating a new resource.
- #drain ⇒ Object
-
#empty? ⇒ Boolean
Whether the pool is empty.
-
#initialize(constructor, limit: nil, concurrency: (limit || 1), policy: nil, tags: nil) ⇒ Controller
constructor
Create a new resource pool.
-
#prune(retain = 0) ⇒ Object
Retire (and close) all unused resources.
-
#release(resource) ⇒ Object
Make the resource resources and let waiting tasks know that there is something resources.
-
#retire(resource) ⇒ Object
Retire a specific resource.
-
#size ⇒ Object
The number of resources in the pool.
- #The pool policy.=(poolpolicy. = (value)) ⇒ Object
-
#to_json ⇒ Object
Generate a JSON representation of the pool.
-
#to_s ⇒ Object
Generate a human-readable representation of the pool.
-
#wait ⇒ Object
Wait until a pool resource has been freed.
Constructor Details
#initialize(constructor, limit: nil, concurrency: (limit || 1), policy: nil, tags: nil) ⇒ Controller
Create a new resource pool.
31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 |
# File 'lib/async/pool/controller.rb', line 31 def initialize(constructor, limit: nil, concurrency: (limit || 1), policy: nil, tags: nil) @constructor = constructor @limit = limit # This semaphore is used to limit the number of concurrent tasks which are creating new resources. @guard = Async::Semaphore.new(concurrency) @policy = policy @gardener = nil @tags = # All available resources: @resources = {} # Resources which may be available to be acquired: # This list may contain false positives, or resources which were okay but have since entered a state which is unusuable. @available = [] # Used to signal when a resource has been released: @notification = Async::Notification.new end |
Instance Attribute Details
#all allocated resources, and their associated usage.(allocatedresources) ⇒ Object (readonly)
98 |
# File 'lib/async/pool/controller.rb', line 98 attr :resources |
#constructor ⇒ Object (readonly)
Returns the value of attribute constructor.
55 56 57 |
# File 'lib/async/pool/controller.rb', line 55 def constructor @constructor end |
#limit ⇒ Object
Returns the value of attribute limit.
58 59 60 |
# File 'lib/async/pool/controller.rb', line 58 def limit @limit end |
#policy ⇒ Object
Returns the value of attribute policy.
95 96 97 |
# File 'lib/async/pool/controller.rb', line 95 def policy @policy end |
#resources ⇒ Object (readonly)
Returns the value of attribute resources.
98 99 100 |
# File 'lib/async/pool/controller.rb', line 98 def resources @resources end |
#tags ⇒ Object
Returns the value of attribute tags.
101 102 103 |
# File 'lib/async/pool/controller.rb', line 101 def @tags end |
#The constructor used to create new resources.(constructorusedtocreatenewresources.) ⇒ Object (readonly)
55 |
# File 'lib/async/pool/controller.rb', line 55 attr :constructor |
#The maximum number of concurrent tasks that can be creating a new resource.(maximumnumberofconcurrenttasksthatcanbecreatinganewresource.) ⇒ Object (readonly)
85 86 87 |
# File 'lib/async/pool/controller.rb', line 85 def concurrency @guard.limit end |
#The maximum number of resources that this pool can have at any given time.(maximumnumberofresourcesthatthispoolcanhaveatanygiventime.) ⇒ Object (readonly)
58 |
# File 'lib/async/pool/controller.rb', line 58 attr_accessor :limit |
#The name of the pool.(nameofthepool.) ⇒ Object (readonly)
101 |
# File 'lib/async/pool/controller.rb', line 101 attr_accessor :tags |
Class Method Details
.wrap(**options, &block) ⇒ Object
Create a new resource pool, using the given block to create new resources.
21 22 23 |
# File 'lib/async/pool/controller.rb', line 21 def self.wrap(**, &block) self.new(block, **) end |
Instance Method Details
#acquire ⇒ Object
Acquire a resource from the pool. If a block is provided, the resource will be released after the block has been executed.
138 139 140 141 142 143 144 145 146 147 148 |
# File 'lib/async/pool/controller.rb', line 138 def acquire resource = wait_for_resource return resource unless block_given? begin yield resource ensure release(resource) end end |
#active? ⇒ Boolean
Whether the pool has any active resources.
109 110 111 |
# File 'lib/async/pool/controller.rb', line 109 def active? !@resources.empty? end |
#as_json ⇒ Object
Generate a JSON representation of the pool.
70 71 72 73 74 75 76 77 |
# File 'lib/async/pool/controller.rb', line 70 def as_json(...) { limit: @limit, concurrency: @guard.limit, usage: @resources.size, availability_summary: self.availability_summary, } end |
#available? ⇒ Boolean
Whether there are available resources, i.e. whether #acquire can reuse an existing resource.
123 124 125 |
# File 'lib/async/pool/controller.rb', line 123 def available? @available.any? end |
#busy? ⇒ Boolean
Whether there are resources which are currently in use.
114 115 116 117 118 119 120 |
# File 'lib/async/pool/controller.rb', line 114 def busy? @resources.collect do |_, usage| return true if usage > 0 end return false end |
#close ⇒ Object
Close all resources in the pool.
174 175 176 177 178 179 |
# File 'lib/async/pool/controller.rb', line 174 def close self.drain @available.clear @gardener&.stop end |
#concurrency ⇒ Object
85 86 87 |
# File 'lib/async/pool/controller.rb', line 85 def concurrency @guard.limit end |
#concurrency=(value) ⇒ Object
Set the maximum number of concurrent tasks that can be creating a new resource.
90 91 92 |
# File 'lib/async/pool/controller.rb', line 90 def concurrency= value @guard.limit = value end |
#drain ⇒ Object
164 165 166 167 168 169 170 171 |
# File 'lib/async/pool/controller.rb', line 164 def drain Console.debug(self, "Draining pool...", size: @resources.size) # Enumerate all existing resources and retire them: while resource = acquire_existing_resource retire(resource) end end |
#empty? ⇒ Boolean
Whether the pool is empty.
133 134 135 |
# File 'lib/async/pool/controller.rb', line 133 def empty? @resources.empty? end |
#prune(retain = 0) ⇒ Object
Retire (and close) all unused resources. If a block is provided, it should implement the desired functionality for unused resources.
184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 |
# File 'lib/async/pool/controller.rb', line 184 def prune(retain = 0) unused = [] # This code must not context switch: @resources.each do |resource, usage| if usage.zero? unused << resource end end # It's okay for this to context switch: unused.each do |resource| if block_given? yield resource else retire(resource) end break if @resources.size <= retain end # Update availability list: @available.clear @resources.each do |resource, usage| if usage < resource.concurrency and resource.reusable? @available << resource end end return unused.size end |
#release(resource) ⇒ Object
Make the resource resources and let waiting tasks know that there is something resources.
151 152 153 154 155 156 157 158 159 160 161 162 |
# File 'lib/async/pool/controller.rb', line 151 def release(resource) processed = false # A resource that is not good should also not be reusable. if resource.reusable? processed = reuse(resource) end # @policy.released(self, resource) ensure retire(resource) unless processed end |
#retire(resource) ⇒ Object
Retire a specific resource.
217 218 219 220 221 222 223 224 225 226 227 |
# File 'lib/async/pool/controller.rb', line 217 def retire(resource) Console.debug(self) {"Retire #{resource}"} @resources.delete(resource) resource.close @notification.signal return true end |
#size ⇒ Object
The number of resources in the pool.
104 105 106 |
# File 'lib/async/pool/controller.rb', line 104 def size @resources.size end |
#The pool policy.=(poolpolicy. = (value)) ⇒ Object
95 |
# File 'lib/async/pool/controller.rb', line 95 attr_accessor :policy |
#to_json ⇒ Object
Generate a JSON representation of the pool.
80 81 82 |
# File 'lib/async/pool/controller.rb', line 80 def to_json(...) as_json.to_json(...) end |
#to_s ⇒ Object
Generate a human-readable representation of the pool.
61 62 63 64 65 66 67 |
# File 'lib/async/pool/controller.rb', line 61 def to_s if @resources.empty? "\#<#{self.class}(#{usage_string})>" else "\#<#{self.class}(#{usage_string}) #{availability_summary.join(';')}>" end end |
#wait ⇒ Object
Wait until a pool resource has been freed.
128 129 130 |
# File 'lib/async/pool/controller.rb', line 128 def wait @notification.wait end |