Class: Celluloid::Supervision::Container::Pool
- Inherits:
-
Object
- Object
- Celluloid::Supervision::Container::Pool
show all
- Includes:
- Celluloid, Behavior
- Defined in:
- lib/celluloid/supervision/container/pool.rb,
lib/celluloid/supervision/container/behavior/pool.rb
Overview
Manages a fixed-size pool of actors Delegates work (i.e. methods) and supervises actors Don’t use this class directly. Instead use MyKlass.pool
Constant Summary
Constants included
from Celluloid
BARE_OBJECT_WARNING_MESSAGE, LINKING_TIMEOUT, OWNER_IVAR, VERSION
Instance Attribute Summary collapse
Class Method Summary
collapse
Instance Method Summary
collapse
-
#__busy ⇒ Object
-
#__busy?(actor) ⇒ Boolean
-
#__crash_handler__(actor, reason) ⇒ Object
Spawn a new worker for every crashed one.
-
#__idle ⇒ Object
-
#__idle?(actor) ⇒ Boolean
-
#__provision_actor__ ⇒ Object
Provision a new actor ( take it out of idle, move it into busy, and avail it ).
-
#__shutdown__ ⇒ Object
-
#__spawn_actor__ ⇒ Object
Instantiate an actor, add it to the actor Set, and return it.
-
#__state(actor) ⇒ Object
-
#_send_(method, *args, &block) ⇒ Object
-
#busy_size ⇒ Object
-
#idle_size ⇒ Object
-
#initialize(options = {}) ⇒ Pool
constructor
-
#inspect ⇒ Object
-
#is_a?(klass) ⇒ Boolean
-
#kind_of?(klass) ⇒ Boolean
-
#method(meth) ⇒ Object
Since Pool allocates worker objects only just before calling them, we can still help Celluloid::Call detect passing invalid parameters to async methods by checking for those methods on the worker class.
-
#method_missing(method, *args, &block) ⇒ Object
-
#methods(include_ancestors = true) ⇒ Object
-
#name ⇒ Object
-
#respond_to?(meth, include_private = false) ⇒ Boolean
-
#to_s ⇒ Object
Methods included from Behavior
[], []=, configure, included, injections, parameter
Methods included from Celluloid
#abort, actor?, #after, #async, boot, #call_chain_id, cores, #current_actor, #defer, detect_recursion, #every, exception_handler, #exclusive, #exclusive?, #future, included, init, #link, #linked_to?, #links, mailbox, #monitor, #monitoring?, public_registry, publish, #receive, register_shutdown, running?, shutdown, #signal, #sleep, stack_dump, stack_summary, start, supervise, suspend, #tasks, #terminate, #timeout, #unlink, #unmonitor, uuid, version, #wait
Constructor Details
#initialize(options = {}) ⇒ Pool
Returns a new instance of Pool.
15
16
17
18
19
20
21
22
23
24
25
26
27
|
# File 'lib/celluloid/supervision/container/pool.rb', line 15
def initialize(options = {})
@idle = []
@busy = []
@klass = options[:actors]
@actors = Set.new
@mutex = Mutex.new
@size = options[:size] || [Celluloid.cores || 2, 2].max
@args = options[:args] ? Array(options[:args]) : []
@idle = @size.times.map { __spawn_actor__ }
end
|
Dynamic Method Handling
This class handles dynamic methods through the method_missing method
#method_missing(method, *args, &block) ⇒ Object
191
192
193
194
195
196
197
|
# File 'lib/celluloid/supervision/container/pool.rb', line 191
def method_missing(method, *args, &block)
if respond_to?(method)
_send_ method, *args, &block
else
super
end
end
|
Instance Attribute Details
#actors ⇒ Object
Returns the value of attribute actors.
13
14
15
|
# File 'lib/celluloid/supervision/container/pool.rb', line 13
def actors
@actors
end
|
#size ⇒ Object
Returns the value of attribute size.
13
14
15
|
# File 'lib/celluloid/supervision/container/pool.rb', line 13
def size
@size
end
|
Class Method Details
.pooling_options(config = {}, mixins = {}) ⇒ Object
50
51
52
53
54
55
56
|
# File 'lib/celluloid/supervision/container/behavior/pool.rb', line 50
def pooling_options(config = {}, mixins = {})
combined = { type: Celluloid::Supervision::Container::Pool }.merge(config).merge(mixins)
combined[:args] = [%i[block actors size args].each_with_object({}) do |p, e|
e[p] = combined.delete(p) if combined[p]
end]
combined
end
|
Instance Method Details
#__busy ⇒ Object
121
122
123
|
# File 'lib/celluloid/supervision/container/pool.rb', line 121
def __busy
@mutex.synchronize { @busy }
end
|
#__busy?(actor) ⇒ Boolean
117
118
119
|
# File 'lib/celluloid/supervision/container/pool.rb', line 117
def __busy?(actor)
@mutex.synchronize { @busy.include? actor }
end
|
#__crash_handler__(actor, reason) ⇒ Object
Spawn a new worker for every crashed one
160
161
162
163
164
165
166
167
|
# File 'lib/celluloid/supervision/container/pool.rb', line 160
def __crash_handler__(actor, reason)
@busy.delete actor
@idle.delete actor
@actors.delete actor
return unless reason
@idle << __spawn_actor__
signal :respawn_complete
end
|
#__idle ⇒ Object
125
126
127
|
# File 'lib/celluloid/supervision/container/pool.rb', line 125
def __idle
@mutex.synchronize { @idle }
end
|
#__idle?(actor) ⇒ Boolean
113
114
115
|
# File 'lib/celluloid/supervision/container/pool.rb', line 113
def __idle?(actor)
@mutex.synchronize { @idle.include? actor }
end
|
#__provision_actor__ ⇒ Object
Provision a new actor ( take it out of idle, move it into busy, and avail it )
144
145
146
147
148
149
150
151
152
153
154
155
156
157
|
# File 'lib/celluloid/supervision/container/pool.rb', line 144
def __provision_actor__
Task.current.guard_warnings = true
@mutex.synchronize do
while @idle.empty?
response = exclusive { receive { |msg| msg.is_a?(Internals::Response) } }
Thread.current[:celluloid_actor].handle_message(response)
end
actor = @idle.shift
@busy << actor
actor
end
end
|
#__shutdown__ ⇒ Object
29
30
31
32
33
34
35
36
37
38
39
40
|
# File 'lib/celluloid/supervision/container/pool.rb', line 29
def __shutdown__
return unless defined?(@actors) && @actors
terminators = @actors.map do |actor|
begin
actor.future(:terminate)
rescue DeadActorError
end
end
terminators.compact.each { |terminator| terminator.value rescue nil }
end
|
#__spawn_actor__ ⇒ Object
Instantiate an actor, add it to the actor Set, and return it
136
137
138
139
140
141
|
# File 'lib/celluloid/supervision/container/pool.rb', line 136
def __spawn_actor__
actor = @klass.new_link(*@args)
@mutex.synchronize { @actors.add(actor) }
@actors.add(actor)
actor
end
|
#__state(actor) ⇒ Object
129
130
131
132
133
|
# File 'lib/celluloid/supervision/container/pool.rb', line 129
def __state(actor)
return :busy if __busy?(actor)
return :idle if __idle?(actor)
:missing
end
|
#_send_(method, *args, &block) ⇒ Object
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
|
# File 'lib/celluloid/supervision/container/pool.rb', line 42
def _send_(method, *args, &block)
actor = __provision_actor__
begin
actor._send_ method, *args, &block
rescue DeadActorError wait :respawn_complete
actor = __provision_actor__
retry
rescue ::Exception => ex
abort ex
ensure
if actor.alive?
@idle << actor
@busy.delete actor
signal :actor_idle
end
end
end
|
#busy_size ⇒ Object
105
106
107
|
# File 'lib/celluloid/supervision/container/pool.rb', line 105
def busy_size
@mutex.synchronize { @busy.length }
end
|
#idle_size ⇒ Object
109
110
111
|
# File 'lib/celluloid/supervision/container/pool.rb', line 109
def idle_size
@mutex.synchronize { @idle.length }
end
|
#inspect ⇒ Object
84
85
86
|
# File 'lib/celluloid/supervision/container/pool.rb', line 84
def inspect
_send_ :inspect
end
|
#is_a?(klass) ⇒ Boolean
68
69
70
|
# File 'lib/celluloid/supervision/container/pool.rb', line 68
def is_a?(klass)
_send_ :is_a?, klass
end
|
#kind_of?(klass) ⇒ Boolean
72
73
74
|
# File 'lib/celluloid/supervision/container/pool.rb', line 72
def kind_of?(klass)
_send_ :kind_of?, klass
end
|
#method(meth) ⇒ Object
Since Pool allocates worker objects only just before calling them, we can still help Celluloid::Call detect passing invalid parameters to async methods by checking for those methods on the worker class
202
203
204
205
206
|
# File 'lib/celluloid/supervision/container/pool.rb', line 202
def method(meth)
super
rescue NameError
@klass.instance_method(meth.to_sym)
end
|
#methods(include_ancestors = true) ⇒ Object
76
77
78
|
# File 'lib/celluloid/supervision/container/pool.rb', line 76
def methods(include_ancestors = true)
_send_ :methods, include_ancestors
end
|
#name ⇒ Object
64
65
66
|
# File 'lib/celluloid/supervision/container/pool.rb', line 64
def name
_send_ @mailbox, :name
end
|
#respond_to?(meth, include_private = false) ⇒ Boolean
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
|
# File 'lib/celluloid/supervision/container/pool.rb', line 169
def respond_to?(meth, include_private = false)
found = method(meth)
if include_private
found ? true : false
else
if found.is_a?(UnboundMethod)
found.owner.public_instance_methods.include?(meth) ||
found.owner.protected_instance_methods.include?(meth)
else
found.receiver.public_methods.include?(meth) ||
found.receiver.protected_methods.include?(meth)
end
end
rescue NameError
false
end
|
#to_s ⇒ Object
80
81
82
|
# File 'lib/celluloid/supervision/container/pool.rb', line 80
def to_s
_send_ :to_s
end
|