Class: Celluloid::Supervision::Container::Pool

Inherits:
Object
  • Object
show all
Includes:
Celluloid, Behavior
Defined in:
lib/celluloid/supervision/container/pool.rb,
lib/celluloid/supervision/container/behavior/pool.rb

Overview

Manages a fixed-size pool of actors Delegates work (i.e. methods) and supervises actors Don’t use this class directly. Instead use MyKlass.pool

Constant Summary

Constants included from Celluloid

BARE_OBJECT_WARNING_MESSAGE, LINKING_TIMEOUT, OWNER_IVAR, VERSION

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Methods included from Behavior

[], []=, configure, included, injections, parameter

Methods included from Celluloid

#abort, actor?, #after, #async, boot, #call_chain_id, cores, #current_actor, #defer, detect_recursion, #every, exception_handler, #exclusive, #exclusive?, #future, included, init, #link, #linked_to?, #links, mailbox, #monitor, #monitoring?, public_registry, publish, #receive, register_shutdown, running?, shutdown, #signal, #sleep, stack_dump, stack_summary, start, supervise, suspend, #tasks, #terminate, #timeout, #unlink, #unmonitor, uuid, version, #wait

Constructor Details

#initialize(options = {}) ⇒ Pool

Returns a new instance of Pool.



15
16
17
18
19
20
21
22
23
24
25
26
27
# File 'lib/celluloid/supervision/container/pool.rb', line 15

def initialize(options = {})
  @idle = []
  @busy = []
  @klass = options[:actors]
  @actors = Set.new
  @mutex = Mutex.new

  @size = options[:size] || [Celluloid.cores || 2, 2].max
  @args = options[:args] ? Array(options[:args]) : []

  # Do this last since it can suspend and/or crash
  @idle = @size.times.map { __spawn_actor__ }
end

Dynamic Method Handling

This class handles dynamic methods through the method_missing method

#method_missing(method, *args, &block) ⇒ Object



191
192
193
194
195
196
197
# File 'lib/celluloid/supervision/container/pool.rb', line 191

def method_missing(method, *args, &block)
  if respond_to?(method)
    _send_ method, *args, &block
  else
    super
  end
end

Instance Attribute Details

#actorsObject (readonly)

Returns the value of attribute actors.



13
14
15
# File 'lib/celluloid/supervision/container/pool.rb', line 13

def actors
  @actors
end

#sizeObject

Returns the value of attribute size.



13
14
15
# File 'lib/celluloid/supervision/container/pool.rb', line 13

def size
  @size
end

Class Method Details

.pooling_options(config = {}, mixins = {}) ⇒ Object



50
51
52
53
54
55
56
# File 'lib/celluloid/supervision/container/behavior/pool.rb', line 50

def pooling_options(config = {}, mixins = {})
  combined = { type: Celluloid::Supervision::Container::Pool }.merge(config).merge(mixins)
  combined[:args] = [%i[block actors size args].each_with_object({}) do |p, e|
    e[p] = combined.delete(p) if combined[p]
  end]
  combined
end

Instance Method Details

#__busyObject



121
122
123
# File 'lib/celluloid/supervision/container/pool.rb', line 121

def __busy
  @mutex.synchronize { @busy }
end

#__busy?(actor) ⇒ Boolean

Returns:

  • (Boolean)


117
118
119
# File 'lib/celluloid/supervision/container/pool.rb', line 117

def __busy?(actor)
  @mutex.synchronize { @busy.include? actor }
end

#__crash_handler__(actor, reason) ⇒ Object

Spawn a new worker for every crashed one



160
161
162
163
164
165
166
167
# File 'lib/celluloid/supervision/container/pool.rb', line 160

def __crash_handler__(actor, reason)
  @busy.delete actor
  @idle.delete actor
  @actors.delete actor
  return unless reason
  @idle << __spawn_actor__
  signal :respawn_complete
end

#__idleObject



125
126
127
# File 'lib/celluloid/supervision/container/pool.rb', line 125

def __idle
  @mutex.synchronize { @idle }
end

#__idle?(actor) ⇒ Boolean

Returns:

  • (Boolean)


113
114
115
# File 'lib/celluloid/supervision/container/pool.rb', line 113

def __idle?(actor)
  @mutex.synchronize { @idle.include? actor }
end

#__provision_actor__Object

Provision a new actor ( take it out of idle, move it into busy, and avail it )



144
145
146
147
148
149
150
151
152
153
154
155
156
157
# File 'lib/celluloid/supervision/container/pool.rb', line 144

def __provision_actor__
  Task.current.guard_warnings = true
  @mutex.synchronize do
    while @idle.empty?
      # Wait for responses from one of the busy actors
      response = exclusive { receive { |msg| msg.is_a?(Internals::Response) } }
      Thread.current[:celluloid_actor].handle_message(response)
    end

    actor = @idle.shift
    @busy << actor
    actor
  end
end

#__shutdown__Object



29
30
31
32
33
34
35
36
37
38
39
40
# File 'lib/celluloid/supervision/container/pool.rb', line 29

def __shutdown__
  return unless defined?(@actors) && @actors
  # TODO: these can be nil if initializer crashes
  terminators = @actors.map do |actor|
    begin
      actor.future(:terminate)
    rescue DeadActorError
    end
  end

  terminators.compact.each { |terminator| terminator.value rescue nil }
end

#__spawn_actor__Object

Instantiate an actor, add it to the actor Set, and return it



136
137
138
139
140
141
# File 'lib/celluloid/supervision/container/pool.rb', line 136

def __spawn_actor__
  actor = @klass.new_link(*@args)
  @mutex.synchronize { @actors.add(actor) }
  @actors.add(actor)
  actor
end

#__state(actor) ⇒ Object



129
130
131
132
133
# File 'lib/celluloid/supervision/container/pool.rb', line 129

def __state(actor)
  return :busy if __busy?(actor)
  return :idle if __idle?(actor)
  :missing
end

#_send_(method, *args, &block) ⇒ Object



42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
# File 'lib/celluloid/supervision/container/pool.rb', line 42

def _send_(method, *args, &block)
  actor = __provision_actor__
  begin
    actor._send_ method, *args, &block
  rescue DeadActorError # if we get a dead actor out of the pool
    wait :respawn_complete
    actor = __provision_actor__
    retry
  rescue ::Exception => ex
    abort ex
  ensure
    if actor.alive?
      @idle << actor
      @busy.delete actor

      # Broadcast that actor is done processing and
      # waiting idle
      signal :actor_idle
    end
  end
end

#busy_sizeObject



105
106
107
# File 'lib/celluloid/supervision/container/pool.rb', line 105

def busy_size
  @mutex.synchronize { @busy.length }
end

#idle_sizeObject



109
110
111
# File 'lib/celluloid/supervision/container/pool.rb', line 109

def idle_size
  @mutex.synchronize { @idle.length }
end

#inspectObject



84
85
86
# File 'lib/celluloid/supervision/container/pool.rb', line 84

def inspect
  _send_ :inspect
end

#is_a?(klass) ⇒ Boolean

Returns:

  • (Boolean)


68
69
70
# File 'lib/celluloid/supervision/container/pool.rb', line 68

def is_a?(klass)
  _send_ :is_a?, klass
end

#kind_of?(klass) ⇒ Boolean

Returns:

  • (Boolean)


72
73
74
# File 'lib/celluloid/supervision/container/pool.rb', line 72

def kind_of?(klass)
  _send_ :kind_of?, klass
end

#method(meth) ⇒ Object

Since Pool allocates worker objects only just before calling them, we can still help Celluloid::Call detect passing invalid parameters to async methods by checking for those methods on the worker class



202
203
204
205
206
# File 'lib/celluloid/supervision/container/pool.rb', line 202

def method(meth)
  super
rescue NameError
  @klass.instance_method(meth.to_sym)
end

#methods(include_ancestors = true) ⇒ Object



76
77
78
# File 'lib/celluloid/supervision/container/pool.rb', line 76

def methods(include_ancestors = true)
  _send_ :methods, include_ancestors
end

#nameObject



64
65
66
# File 'lib/celluloid/supervision/container/pool.rb', line 64

def name
  _send_ @mailbox, :name
end

#respond_to?(meth, include_private = false) ⇒ Boolean

Returns:

  • (Boolean)


169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
# File 'lib/celluloid/supervision/container/pool.rb', line 169

def respond_to?(meth, include_private = false)
  # NOTE: use method() here since this class
  # shouldn't be used directly, and method() is less
  # likely to be "reimplemented" inconsistently
  # with other Object.*method* methods.

  found = method(meth)
  if include_private
    found ? true : false
  else
    if found.is_a?(UnboundMethod)
      found.owner.public_instance_methods.include?(meth) ||
        found.owner.protected_instance_methods.include?(meth)
    else
      found.receiver.public_methods.include?(meth) ||
        found.receiver.protected_methods.include?(meth)
    end
  end
rescue NameError
  false
end

#to_sObject



80
81
82
# File 'lib/celluloid/supervision/container/pool.rb', line 80

def to_s
  _send_ :to_s
end