Class: Async::Pool::Controller

Inherits:
Object
  • Object
show all
Defined in:
lib/async/pool/controller.rb

Overview

A resource pool controller.

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(constructor, limit: nil, concurrency: (limit || 1), policy: nil, tags: nil) ⇒ Controller

Create a new resource pool.



31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
# File 'lib/async/pool/controller.rb', line 31

def initialize(constructor, limit: nil, concurrency: (limit || 1), policy: nil, tags: nil)
	@constructor = constructor
	@limit = limit
	
	# This semaphore is used to limit the number of concurrent tasks which are creating new resources.
	@guard = Async::Semaphore.new(concurrency)
	
	@policy = policy
	@gardener = nil
	
	@tags = tags
	
	# All available resources:
	@resources = {}
	
	# Resources which may be available to be acquired:
	# This list may contain false positives, or resources which were okay but have since entered a state which is unusuable.
	@available = []
	
	# Used to signal when a resource has been released:
	@notification = Async::Notification.new
end

Instance Attribute Details

#all allocated resources, and their associated usage.(allocatedresources) ⇒ Object (readonly)



98
# File 'lib/async/pool/controller.rb', line 98

attr :resources

#constructorObject (readonly)

Returns the value of attribute constructor.



55
56
57
# File 'lib/async/pool/controller.rb', line 55

def constructor
  @constructor
end

#limitObject

Returns the value of attribute limit.



58
59
60
# File 'lib/async/pool/controller.rb', line 58

def limit
  @limit
end

#policyObject

Returns the value of attribute policy.



95
96
97
# File 'lib/async/pool/controller.rb', line 95

def policy
  @policy
end

#resourcesObject (readonly)

Returns the value of attribute resources.



98
99
100
# File 'lib/async/pool/controller.rb', line 98

def resources
  @resources
end

#tagsObject

Returns the value of attribute tags.



101
102
103
# File 'lib/async/pool/controller.rb', line 101

def tags
  @tags
end

#The constructor used to create new resources.(constructorusedtocreatenewresources.) ⇒ Object (readonly)



55
# File 'lib/async/pool/controller.rb', line 55

attr :constructor

#The maximum number of concurrent tasks that can be creating a new resource.(maximumnumberofconcurrenttasksthatcanbecreatinganewresource.) ⇒ Object (readonly)



85
86
87
# File 'lib/async/pool/controller.rb', line 85

def concurrency
	@guard.limit
end

#The maximum number of resources that this pool can have at any given time.(maximumnumberofresourcesthatthispoolcanhaveatanygiventime.) ⇒ Object (readonly)



58
# File 'lib/async/pool/controller.rb', line 58

attr_accessor :limit

#The name of the pool.(nameofthepool.) ⇒ Object (readonly)



101
# File 'lib/async/pool/controller.rb', line 101

attr_accessor :tags

Class Method Details

.wrap(**options, &block) ⇒ Object

Create a new resource pool, using the given block to create new resources.



21
22
23
# File 'lib/async/pool/controller.rb', line 21

def self.wrap(**options, &block)
	self.new(block, **options)
end

Instance Method Details

#acquireObject

Acquire a resource from the pool. If a block is provided, the resource will be released after the block has been executed.



138
139
140
141
142
143
144
145
146
147
148
# File 'lib/async/pool/controller.rb', line 138

def acquire
	resource = wait_for_resource
	
	return resource unless block_given?
	
	begin
		yield resource
	ensure
		release(resource)
	end
end

#active?Boolean

Whether the pool has any active resources.

Returns:

  • (Boolean)


109
110
111
# File 'lib/async/pool/controller.rb', line 109

def active?
	!@resources.empty?
end

#as_jsonObject

Generate a JSON representation of the pool.



70
71
72
73
74
75
76
77
# File 'lib/async/pool/controller.rb', line 70

def as_json(...)
	{
		limit: @limit,
		concurrency: @guard.limit,
		usage: @resources.size,
		availability_summary: self.availability_summary,
	}
end

#available?Boolean

Whether there are available resources, i.e. whether #acquire can reuse an existing resource.

Returns:

  • (Boolean)


123
124
125
# File 'lib/async/pool/controller.rb', line 123

def available?
	@available.any?
end

#busy?Boolean

Whether there are resources which are currently in use.

Returns:

  • (Boolean)


114
115
116
117
118
119
120
# File 'lib/async/pool/controller.rb', line 114

def busy?
	@resources.collect do |_, usage|
		return true if usage > 0
	end
	
	return false
end

#closeObject

Close all resources in the pool.



174
175
176
177
178
179
# File 'lib/async/pool/controller.rb', line 174

def close
	self.drain
	
	@available.clear
	@gardener&.stop
end

#concurrencyObject



85
86
87
# File 'lib/async/pool/controller.rb', line 85

def concurrency
	@guard.limit
end

#concurrency=(value) ⇒ Object

Set the maximum number of concurrent tasks that can be creating a new resource.



90
91
92
# File 'lib/async/pool/controller.rb', line 90

def concurrency= value
	@guard.limit = value
end

#drainObject



164
165
166
167
168
169
170
171
# File 'lib/async/pool/controller.rb', line 164

def drain
	Console.debug(self, "Draining pool...", size: @resources.size)
	
	# Enumerate all existing resources and retire them:
	while resource = acquire_existing_resource
		retire(resource)
	end
end

#empty?Boolean

Whether the pool is empty.

Returns:

  • (Boolean)


133
134
135
# File 'lib/async/pool/controller.rb', line 133

def empty?
	@resources.empty?
end

#prune(retain = 0) ⇒ Object

Retire (and close) all unused resources. If a block is provided, it should implement the desired functionality for unused resources.



184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
# File 'lib/async/pool/controller.rb', line 184

def prune(retain = 0)
	unused = []
	
	# This code must not context switch:
	@resources.each do |resource, usage|
		if usage.zero?
			unused << resource
		end
	end
	
	# It's okay for this to context switch:
	unused.each do |resource|
		if block_given?
			yield resource
		else
			retire(resource)
		end
		
		break if @resources.size <= retain
	end
	
	# Update availability list:
	@available.clear
	@resources.each do |resource, usage|
		if usage < resource.concurrency and resource.reusable?
			@available << resource
		end
	end
	
	return unused.size
end

#release(resource) ⇒ Object

Make the resource resources and let waiting tasks know that there is something resources.



151
152
153
154
155
156
157
158
159
160
161
162
# File 'lib/async/pool/controller.rb', line 151

def release(resource)
	processed = false
	
	# A resource that is not good should also not be reusable.
	if resource.reusable?
		processed = reuse(resource)
	end
	
	# @policy.released(self, resource)
ensure
	retire(resource) unless processed
end

#retire(resource) ⇒ Object

Retire a specific resource.



217
218
219
220
221
222
223
224
225
226
227
# File 'lib/async/pool/controller.rb', line 217

def retire(resource)
	Console.debug(self) {"Retire #{resource}"}
	
	@resources.delete(resource)
	
	resource.close
	
	@notification.signal
	
	return true
end

#sizeObject

The number of resources in the pool.



104
105
106
# File 'lib/async/pool/controller.rb', line 104

def size
	@resources.size
end

#The pool policy.=(poolpolicy. = (value)) ⇒ Object



95
# File 'lib/async/pool/controller.rb', line 95

attr_accessor :policy

#to_jsonObject

Generate a JSON representation of the pool.



80
81
82
# File 'lib/async/pool/controller.rb', line 80

def to_json(...)
	as_json.to_json(...)
end

#to_sObject

Generate a human-readable representation of the pool.



61
62
63
64
65
66
67
# File 'lib/async/pool/controller.rb', line 61

def to_s
	if @resources.empty?
		"\#<#{self.class}(#{usage_string})>"
	else
		"\#<#{self.class}(#{usage_string}) #{availability_summary.join(';')}>"
	end
end

#waitObject

Wait until a pool resource has been freed.



128
129
130
# File 'lib/async/pool/controller.rb', line 128

def wait
	@notification.wait
end