Class: Celluloid::Actor
- Inherits:
-
Object
- Object
- Celluloid::Actor
- Extended by:
- Registry
- Defined in:
- lib/celluloid/actor.rb
Overview
Actors are Celluloid’s concurrency primitive. They’re implemented as normal Ruby objects wrapped in threads which communicate with asynchronous messages.
Instance Attribute Summary collapse
-
#links ⇒ Object
readonly
Returns the value of attribute links.
-
#mailbox ⇒ Object
readonly
Returns the value of attribute mailbox.
-
#name ⇒ Object
readonly
Returns the value of attribute name.
-
#proxy ⇒ Object
readonly
Returns the value of attribute proxy.
-
#subject ⇒ Object
readonly
Returns the value of attribute subject.
-
#tasks ⇒ Object
readonly
Returns the value of attribute tasks.
-
#thread ⇒ Object
readonly
Returns the value of attribute thread.
Class Method Summary collapse
-
.all ⇒ Object
Obtain all running actors in the system.
-
.async(mailbox, meth, *args, &block) ⇒ Object
Invoke a method asynchronously on an actor via its mailbox.
-
.call(mailbox, meth, *args, &block) ⇒ Object
Invoke a method on the given actor via its mailbox.
-
.current ⇒ Object
Obtain the current actor.
-
.future(mailbox, meth, *args, &block) ⇒ Object
Call a method asynchronously and retrieve its value later.
-
.name ⇒ Object
Obtain the name of the current actor.
Instance Method Summary collapse
-
#after(interval) ⇒ Object
Schedule a block to run at the given time.
-
#cleanup(exit_event) ⇒ Object
Clean up after this actor.
-
#every(interval) ⇒ Object
Schedule a block to run at the given time.
-
#exclusive ⇒ Object
Execute a code block in exclusive mode.
-
#exclusive? ⇒ Boolean
Is this actor running in exclusive mode?.
-
#handle_crash(exception) ⇒ Object
Handle any exceptions that occur within a running actor.
-
#handle_exit_event(exit_event) ⇒ Object
Handle exit events received by this actor.
-
#handle_message(message) ⇒ Object
Handle an incoming message.
-
#initialize(subject) ⇒ Actor
constructor
Wrap the given subject with an Actor.
-
#receive(timeout = nil, &block) ⇒ Object
Receive an asynchronous message.
-
#run ⇒ Object
Run the actor loop.
-
#run_finalizer ⇒ Object
Run the user-defined finalizer, if one is set.
-
#shutdown(exit_event = ExitEvent.new(@proxy)) ⇒ Object
Handle cleaning up this actor after it exits.
-
#signal(name, value = nil) ⇒ Object
Send a signal with the given name to all waiting methods.
-
#sleep(interval) ⇒ Object
Sleep for the given amount of time.
-
#terminate ⇒ Object
Terminate this actor.
-
#timeout ⇒ Object
How long to wait until the next timer fires.
-
#wait(name) ⇒ Object
Wait for the given signal.
Methods included from Registry
[], []=, registered
Constructor Details
#initialize(subject) ⇒ Actor
Wrap the given subject with an Actor
94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 |
# File 'lib/celluloid/actor.rb', line 94 def initialize(subject) @subject = subject @mailbox = subject.class.mailbox_factory @tasks = Set.new @links = Links.new @signals = Signals.new @receivers = Receivers.new @timers = Timers.new @running = true @exclusive = false @name = nil @thread = ThreadHandle.new do Thread.current[:actor] = self Thread.current[:mailbox] = @mailbox run end @proxy = ActorProxy.new(self) end |
Instance Attribute Details
#links ⇒ Object (readonly)
Returns the value of attribute links.
23 24 25 |
# File 'lib/celluloid/actor.rb', line 23 def links @links end |
#mailbox ⇒ Object (readonly)
Returns the value of attribute mailbox.
23 24 25 |
# File 'lib/celluloid/actor.rb', line 23 def mailbox @mailbox end |
#name ⇒ Object (readonly)
Returns the value of attribute name.
23 24 25 |
# File 'lib/celluloid/actor.rb', line 23 def name @name end |
#proxy ⇒ Object (readonly)
Returns the value of attribute proxy.
23 24 25 |
# File 'lib/celluloid/actor.rb', line 23 def proxy @proxy end |
#subject ⇒ Object (readonly)
Returns the value of attribute subject.
23 24 25 |
# File 'lib/celluloid/actor.rb', line 23 def subject @subject end |
#tasks ⇒ Object (readonly)
Returns the value of attribute tasks.
23 24 25 |
# File 'lib/celluloid/actor.rb', line 23 def tasks @tasks end |
#thread ⇒ Object (readonly)
Returns the value of attribute thread.
23 24 25 |
# File 'lib/celluloid/actor.rb', line 23 def thread @thread end |
Class Method Details
.all ⇒ Object
Obtain all running actors in the system
83 84 85 86 87 88 89 90 |
# File 'lib/celluloid/actor.rb', line 83 def all actors = [] Thread.list.each do |t| actor = t[:actor] actors << actor.proxy if actor end actors end |
.async(mailbox, meth, *args, &block) ⇒ Object
Invoke a method asynchronously on an actor via its mailbox
64 65 66 67 68 69 70 71 72 73 |
# File 'lib/celluloid/actor.rb', line 64 def async(mailbox, meth, *args, &block) begin mailbox << AsyncCall.new(Thread.mailbox, meth, args, block) rescue MailboxError # Silently swallow asynchronous calls to dead actors. There's no way # to reliably generate DeadActorErrors for async calls, so users of # async calls should find other ways to deal with actors dying # during an async call (i.e. linking/supervisors) end end |
.call(mailbox, meth, *args, &block) ⇒ Object
Invoke a method on the given actor via its mailbox
41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 |
# File 'lib/celluloid/actor.rb', line 41 def call(mailbox, meth, *args, &block) call = SyncCall.new(Thread.mailbox, meth, args, block) begin mailbox << call rescue MailboxError raise DeadActorError, "attempted to call a dead actor" end if Celluloid.actor? and not Celluloid.exclusive? # The current task will be automatically resumed when we get a response Task.suspend(:callwait).value else # Otherwise we're inside a normal thread, so block response = Thread.mailbox.receive do |msg| msg.respond_to?(:call) and msg.call == call end response.value end end |
.current ⇒ Object
Obtain the current actor
27 28 29 30 31 |
# File 'lib/celluloid/actor.rb', line 27 def current actor = Thread.current[:actor] raise NotActorError, "not in actor scope" unless actor actor.proxy end |
.future(mailbox, meth, *args, &block) ⇒ Object
Call a method asynchronously and retrieve its value later
76 77 78 79 80 |
# File 'lib/celluloid/actor.rb', line 76 def future(mailbox, meth, *args, &block) future = Future.new future.execute(mailbox, meth, args, block) future end |
.name ⇒ Object
Obtain the name of the current actor
34 35 36 37 38 |
# File 'lib/celluloid/actor.rb', line 34 def name actor = Thread.current[:actor] raise NotActorError, "not in actor scope" unless actor actor.name end |
Instance Method Details
#after(interval) ⇒ Object
Schedule a block to run at the given time
197 198 199 200 201 |
# File 'lib/celluloid/actor.rb', line 197 def after(interval) @timers.add(interval) do Task.new(:timer) { yield }.resume end end |
#cleanup(exit_event) ⇒ Object
Clean up after this actor
271 272 273 274 275 276 277 |
# File 'lib/celluloid/actor.rb', line 271 def cleanup(exit_event) @mailbox.shutdown @links.send_event exit_event tasks.each { |task| task.terminate } rescue => ex Logger.crash("#{@subject.class}: CLEANUP CRASHED!", ex) end |
#every(interval) ⇒ Object
Schedule a block to run at the given time
204 205 206 207 208 |
# File 'lib/celluloid/actor.rb', line 204 def every(interval) @timers.add(interval, true) do Task.new(:timer) { yield }.resume end end |
#exclusive ⇒ Object
Execute a code block in exclusive mode.
121 122 123 124 125 126 |
# File 'lib/celluloid/actor.rb', line 121 def exclusive @exclusive = true yield ensure @exclusive = false end |
#exclusive? ⇒ Boolean
Is this actor running in exclusive mode?
116 117 118 |
# File 'lib/celluloid/actor.rb', line 116 def exclusive? @exclusive end |
#handle_crash(exception) ⇒ Object
Handle any exceptions that occur within a running actor
247 248 249 250 251 252 |
# File 'lib/celluloid/actor.rb', line 247 def handle_crash(exception) Logger.crash("#{@subject.class} crashed!", exception) shutdown ExitEvent.new(@proxy, exception) rescue => ex Logger.crash("#{@subject.class}: ERROR HANDLER CRASHED!", ex) end |
#handle_exit_event(exit_event) ⇒ Object
Handle exit events received by this actor
235 236 237 238 239 240 241 242 243 244 |
# File 'lib/celluloid/actor.rb', line 235 def handle_exit_event(exit_event) exit_handler = @subject.class.exit_handler if exit_handler return @subject.send(exit_handler, exit_event.actor, exit_event.reason) end # Reraise exceptions from linked actors # If no reason is given, actor terminated cleanly raise exit_event.reason if exit_event.reason end |
#handle_message(message) ⇒ Object
Handle an incoming message
222 223 224 225 226 227 228 229 230 231 232 |
# File 'lib/celluloid/actor.rb', line 222 def () case when Call Task.new(:message_handler) { .dispatch(@subject) }.resume when Response .call.task.resume else @receivers.() end end |
#receive(timeout = nil, &block) ⇒ Object
Receive an asynchronous message
144 145 146 |
# File 'lib/celluloid/actor.rb', line 144 def receive(timeout = nil, &block) @receivers.receive(timeout, &block) end |
#run ⇒ Object
Run the actor loop
149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 |
# File 'lib/celluloid/actor.rb', line 149 def run begin while @running begin = @mailbox.receive(timeout) rescue ExitEvent => exit_event Task.new(:exit_handler) { handle_exit_event exit_event }.resume retry rescue NamingRequest => ex @name = ex.name retry rescue TerminationRequest break end if else # No message indicates a timeout @timers.fire @receivers.fire_timers end end rescue MailboxShutdown # If the mailbox detects shutdown, exit the actor end shutdown rescue Exception => ex handle_crash(ex) raise unless ex.is_a? StandardError end |
#run_finalizer ⇒ Object
Run the user-defined finalizer, if one is set
264 265 266 267 268 |
# File 'lib/celluloid/actor.rb', line 264 def run_finalizer @subject.finalize if @subject.respond_to? :finalize rescue => ex Logger.crash("#{@subject.class}#finalize crashed!", ex) end |
#shutdown(exit_event = ExitEvent.new(@proxy)) ⇒ Object
Handle cleaning up this actor after it exits
255 256 257 258 259 260 261 |
# File 'lib/celluloid/actor.rb', line 255 def shutdown(exit_event = ExitEvent.new(@proxy)) run_finalizer cleanup exit_event ensure Thread.current[:actor] = nil Thread.current[:mailbox] = nil end |
#signal(name, value = nil) ⇒ Object
Send a signal with the given name to all waiting methods
134 135 136 |
# File 'lib/celluloid/actor.rb', line 134 def signal(name, value = nil) @signals.send name, value end |
#sleep(interval) ⇒ Object
Sleep for the given amount of time
211 212 213 214 215 216 217 218 219 |
# File 'lib/celluloid/actor.rb', line 211 def sleep(interval) if Celluloid.exclusive? Kernel.sleep(interval) else task = Task.current @timers.add(interval) { task.resume } Task.suspend :sleeping end end |
#terminate ⇒ Object
Terminate this actor
129 130 131 |
# File 'lib/celluloid/actor.rb', line 129 def terminate @running = false end |
#timeout ⇒ Object
How long to wait until the next timer fires
183 184 185 186 187 188 189 190 191 192 193 194 |
# File 'lib/celluloid/actor.rb', line 183 def timeout i1 = @timers.wait_interval i2 = @receivers.wait_interval if i1 and i2 i1 < i2 ? i1 : i2 elsif i1 i1 else i2 end end |
#wait(name) ⇒ Object
Wait for the given signal
139 140 141 |
# File 'lib/celluloid/actor.rb', line 139 def wait(name) @signals.wait name end |