Class: Async::Pool::Controller

Inherits:
Object
  • Object
show all
Defined in:
lib/async/pool/controller.rb

Overview

A resource pool controller.

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(constructor, limit: nil, concurrency: (limit || 1), policy: nil, tags: nil) ⇒ Controller

Create a new resource pool.



29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
# File 'lib/async/pool/controller.rb', line 29

def initialize(constructor, limit: nil, concurrency: (limit || 1), policy: nil, tags: nil)
	@constructor = constructor
	@limit = limit
	
	# This semaphore is used to limit the number of concurrent tasks which are creating new resources.
	@guard = Async::Semaphore.new(concurrency)
	
	@policy = policy
	@gardener = nil
	
	@tags = tags
	
	# All available resources:
	@resources = {}
	
	# Resources which may be available to be acquired:
	# This list may contain false positives, or resources which were okay but have since entered a state which is unusuable.
	@available = []
	
	# Used to signal when a resource has been released:
	@notification = Async::Notification.new
end

Instance Attribute Details

#all allocated resources, and their associated usage.(allocatedresources) ⇒ Object (readonly)



96
# File 'lib/async/pool/controller.rb', line 96

attr :resources

#constructorObject (readonly)

Returns the value of attribute constructor.



53
54
55
# File 'lib/async/pool/controller.rb', line 53

def constructor
  @constructor
end

#limitObject

Returns the value of attribute limit.



56
57
58
# File 'lib/async/pool/controller.rb', line 56

def limit
  @limit
end

#policyObject

Returns the value of attribute policy.



93
94
95
# File 'lib/async/pool/controller.rb', line 93

def policy
  @policy
end

#resourcesObject (readonly)

Returns the value of attribute resources.



96
97
98
# File 'lib/async/pool/controller.rb', line 96

def resources
  @resources
end

#tagsObject

Returns the value of attribute tags.



99
100
101
# File 'lib/async/pool/controller.rb', line 99

def tags
  @tags
end

#The constructor used to create new resources.(constructorusedtocreatenewresources.) ⇒ Object (readonly)



53
# File 'lib/async/pool/controller.rb', line 53

attr :constructor

#The maximum number of concurrent tasks that can be creating a new resource.(maximumnumberofconcurrenttasksthatcanbecreatinganewresource.) ⇒ Object (readonly)



83
84
85
# File 'lib/async/pool/controller.rb', line 83

def concurrency
	@guard.limit
end

#The maximum number of resources that this pool can have at any given time.(maximumnumberofresourcesthatthispoolcanhaveatanygiventime.) ⇒ Object (readonly)



56
# File 'lib/async/pool/controller.rb', line 56

attr_accessor :limit

#The name of the pool.(nameofthepool.) ⇒ Object (readonly)



99
# File 'lib/async/pool/controller.rb', line 99

attr_accessor :tags

Class Method Details

.wrap(**options, &block) ⇒ Object

Create a new resource pool, using the given block to create new resources.



19
20
21
# File 'lib/async/pool/controller.rb', line 19

def self.wrap(**options, &block)
	self.new(block, **options)
end

Instance Method Details

#acquireObject

Acquire a resource from the pool. If a block is provided, the resource will be released after the block has been executed.



136
137
138
139
140
141
142
143
144
145
146
# File 'lib/async/pool/controller.rb', line 136

def acquire
	resource = wait_for_resource
	
	return resource unless block_given?
	
	begin
		yield resource
	ensure
		release(resource)
	end
end

#active?Boolean

Whether the pool has any active resources.

Returns:

  • (Boolean)


107
108
109
# File 'lib/async/pool/controller.rb', line 107

def active?
	!@resources.empty?
end

#as_jsonObject

Generate a JSON representation of the pool.



68
69
70
71
72
73
74
75
# File 'lib/async/pool/controller.rb', line 68

def as_json(...)
	{
		limit: @limit,
		concurrency: @guard.limit,
		usage: @resources.size,
		availability_summary: self.availability_summary,
	}
end

#available?Boolean

Whether there are available resources, i.e. whether #acquire can reuse an existing resource.

Returns:

  • (Boolean)


121
122
123
# File 'lib/async/pool/controller.rb', line 121

def available?
	@available.any?
end

#busy?Boolean

Whether there are resources which are currently in use.

Returns:

  • (Boolean)


112
113
114
115
116
117
118
# File 'lib/async/pool/controller.rb', line 112

def busy?
	@resources.collect do |_, usage|
		return true if usage > 0
	end
	
	return false
end

#closeObject

Drain the pool, clear all resources, and stop the gardener.



173
174
175
176
177
178
# File 'lib/async/pool/controller.rb', line 173

def close
	self.drain
	
	@available.clear
	@gardener&.stop
end

#concurrencyObject



83
84
85
# File 'lib/async/pool/controller.rb', line 83

def concurrency
	@guard.limit
end

#concurrency=(value) ⇒ Object

Set the maximum number of concurrent tasks that can be creating a new resource.



88
89
90
# File 'lib/async/pool/controller.rb', line 88

def concurrency= value
	@guard.limit = value
end

#drainObject

Drain the pool, closing all resources.



163
164
165
166
167
168
169
170
# File 'lib/async/pool/controller.rb', line 163

def drain
	Console.debug(self, "Draining pool...", size: @resources.size)
	
	# Enumerate all existing resources and retire them:
	while resource = acquire_existing_resource
		retire(resource)
	end
end

#empty?Boolean

Whether the pool is empty.

Returns:

  • (Boolean)


131
132
133
# File 'lib/async/pool/controller.rb', line 131

def empty?
	@resources.empty?
end

#prune(retain = 0) ⇒ Object

Retire (and close) all unused resources. If a block is provided, it should implement the desired functionality for unused resources.



183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
# File 'lib/async/pool/controller.rb', line 183

def prune(retain = 0)
	unused = []
	
	# This code must not context switch:
	@resources.each do |resource, usage|
		if usage.zero?
			unused << resource
		end
	end
	
	# It's okay for this to context switch:
	unused.each do |resource|
		if block_given?
			yield resource
		else
			retire(resource)
		end
		
		break if @resources.size <= retain
	end
	
	# Update availability list:
	@available.clear
	@resources.each do |resource, usage|
		if usage < resource.concurrency and resource.reusable?
			@available << resource
		end
	end
	
	return unused.size
end

#release(resource) ⇒ Object

Make the resource resources and let waiting tasks know that there is something resources.



149
150
151
152
153
154
155
156
157
158
159
160
# File 'lib/async/pool/controller.rb', line 149

def release(resource)
	processed = false
	
	# A resource that is not good should also not be reusable.
	if resource.reusable?
		processed = reuse(resource)
	end
	
	# @policy.released(self, resource)
ensure
	retire(resource) unless processed
end

#retire(resource) ⇒ Object

Retire a specific resource.



216
217
218
219
220
221
222
223
224
225
226
# File 'lib/async/pool/controller.rb', line 216

def retire(resource)
	Console.debug(self) {"Retire #{resource}"}
	
	@resources.delete(resource)
	
	resource.close
	
	@notification.signal
	
	return true
end

#sizeObject

The number of resources in the pool.



102
103
104
# File 'lib/async/pool/controller.rb', line 102

def size
	@resources.size
end

#The pool policy.=(poolpolicy. = (value)) ⇒ Object



93
# File 'lib/async/pool/controller.rb', line 93

attr_accessor :policy

#to_jsonObject

Generate a JSON representation of the pool.



78
79
80
# File 'lib/async/pool/controller.rb', line 78

def to_json(...)
	as_json.to_json(...)
end

#to_sObject

Generate a human-readable representation of the pool.



59
60
61
62
63
64
65
# File 'lib/async/pool/controller.rb', line 59

def to_s
	if @resources.empty?
		"\#<#{self.class}(#{usage_string})>"
	else
		"\#<#{self.class}(#{usage_string}) #{availability_summary.join(';')}>"
	end
end

#waitObject

Wait until a pool resource has been freed.



126
127
128
# File 'lib/async/pool/controller.rb', line 126

def wait
	@notification.wait
end