Class: Celluloid::Actor
- Inherits:
-
Object
- Object
- Celluloid::Actor
- Extended by:
- Forwardable
- Defined in:
- lib/celluloid/actor.rb,
lib/celluloid/actor/system.rb,
lib/celluloid/system_events.rb
Overview
Actors are Celluloid’s concurrency primitive. They’re implemented as normal Ruby objects wrapped in threads which communicate with asynchronous messages.
Defined Under Namespace
Instance Attribute Summary collapse
-
#behavior ⇒ Object
readonly
Returns the value of attribute behavior.
-
#exit_handler ⇒ Object
writeonly
Sets the attribute exit_handler.
-
#links ⇒ Object
readonly
Returns the value of attribute links.
-
#mailbox ⇒ Object
readonly
Returns the value of attribute mailbox.
-
#name ⇒ Object
readonly
Returns the value of attribute name.
-
#proxy ⇒ Object
readonly
Returns the value of attribute proxy.
-
#tasks ⇒ Object
readonly
Returns the value of attribute tasks.
-
#thread ⇒ Object
readonly
Returns the value of attribute thread.
-
#timers ⇒ Object
readonly
Returns the value of attribute timers.
Class Method Summary collapse
-
.all ⇒ Object
Obtain all running actors in the system.
-
.async(mailbox, meth, *args, &block) ⇒ Object
Invoke a method asynchronously on an actor via its mailbox.
-
.call(mailbox, meth, *args, &block) ⇒ Object
Invoke a method on the given actor via its mailbox.
-
.current ⇒ Object
Obtain the current actor.
-
.future(mailbox, meth, *args, &block) ⇒ Object
Call a method asynchronously and retrieve its value later.
-
.join(actor, timeout = nil) ⇒ Object
Wait for an actor to terminate.
-
.link(actor) ⇒ Object
Link to another actor.
-
.linked_to?(actor) ⇒ Boolean
Are we bidirectionally linked to the given actor?.
-
.monitor(actor) ⇒ Object
Watch for exit events from another actor.
-
.monitoring?(actor) ⇒ Boolean
Are we monitoring the given actor?.
-
.registered_name ⇒ Object
Obtain the name of the current actor.
-
.unlink(actor) ⇒ Object
Unlink from another actor.
-
.unmonitor(actor) ⇒ Object
Stop waiting for exit events from another actor.
Instance Method Summary collapse
-
#after(interval, &block) ⇒ Object
Schedule a block to run at the given time.
- #behavior_proxy ⇒ Object
-
#cleanup(exit_event) ⇒ Object
Clean up after this actor.
- #default_exit_handler(event) ⇒ Object
-
#every(interval, &block) ⇒ Object
Schedule a block to run at the given time.
-
#handle(*patterns, &block) ⇒ Object
Register a new handler for a given pattern.
-
#handle_crash(exception) ⇒ Object
Handle any exceptions that occur within a running actor.
-
#handle_message(message) ⇒ Object
Handle standard low-priority messages.
-
#handle_system_event(event) ⇒ Object
Handle high-priority system event messages.
-
#initialize(behavior, options) ⇒ Actor
constructor
A new instance of Actor.
-
#kill(actor) ⇒ Object
Forcibly kill a given actor.
-
#linking_request(receiver, type) ⇒ Object
Perform a linking request with another actor.
-
#receive(timeout = nil, &block) ⇒ Object
Receive an asynchronous message.
-
#run ⇒ Object
Run the actor loop.
- #setup_thread ⇒ Object
-
#shutdown(exit_event = ExitEvent.new(behavior_proxy)) ⇒ Object
Handle cleaning up this actor after it exits.
-
#signal(name, value = nil) ⇒ Object
Send a signal with the given name to all waiting methods.
-
#sleep(interval) ⇒ Object
Sleep for the given amount of time.
- #start ⇒ Object
-
#task(task_type, meta = nil) ⇒ Object
Run a method inside a task unless it’s exclusive.
-
#terminate ⇒ Object
Terminate this actor.
- #timeout(duration) ⇒ Object
-
#wait(name) ⇒ Object
Wait for the given signal.
Constructor Details
#initialize(behavior, options) ⇒ Actor
Returns a new instance of Actor.
102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 |
# File 'lib/celluloid/actor.rb', line 102 def initialize(behavior, ) @behavior = behavior @actor_system = .fetch(:actor_system) @mailbox = .fetch(:mailbox_class, Mailbox).new @mailbox.max_size = .fetch(:mailbox_size, nil) @task_class = [:task_class] || Celluloid.task_class @exit_handler = method(:default_exit_handler) @exclusive = .fetch(:exclusive, false) @timers = Timers::Group.new @tasks = Internals::TaskSet.new @links = Internals::Links.new @handlers = Internals::Handlers.new @receivers = Internals::Receivers.new(@timers) @signals = Internals::Signals.new @running = false @name = nil handle(SystemEvent) do || handle_system_event end end |
Instance Attribute Details
#behavior ⇒ Object (readonly)
Returns the value of attribute behavior.
8 9 10 |
# File 'lib/celluloid/actor.rb', line 8 def behavior @behavior end |
#exit_handler=(value) ⇒ Object (writeonly)
Sets the attribute exit_handler
9 10 11 |
# File 'lib/celluloid/actor.rb', line 9 def exit_handler=(value) @exit_handler = value end |
#links ⇒ Object (readonly)
Returns the value of attribute links.
8 9 10 |
# File 'lib/celluloid/actor.rb', line 8 def links @links end |
#mailbox ⇒ Object (readonly)
Returns the value of attribute mailbox.
8 9 10 |
# File 'lib/celluloid/actor.rb', line 8 def mailbox @mailbox end |
#name ⇒ Object (readonly)
Returns the value of attribute name.
8 9 10 |
# File 'lib/celluloid/actor.rb', line 8 def name @name end |
#proxy ⇒ Object (readonly)
Returns the value of attribute proxy.
8 9 10 |
# File 'lib/celluloid/actor.rb', line 8 def proxy @proxy end |
#tasks ⇒ Object (readonly)
Returns the value of attribute tasks.
8 9 10 |
# File 'lib/celluloid/actor.rb', line 8 def tasks @tasks end |
#thread ⇒ Object (readonly)
Returns the value of attribute thread.
8 9 10 |
# File 'lib/celluloid/actor.rb', line 8 def thread @thread end |
#timers ⇒ Object (readonly)
Returns the value of attribute timers.
8 9 10 |
# File 'lib/celluloid/actor.rb', line 8 def timers @timers end |
Class Method Details
.all ⇒ Object
Obtain all running actors in the system
49 50 51 |
# File 'lib/celluloid/actor.rb', line 49 def all Celluloid.actor_system.running end |
.async(mailbox, meth, *args, &block) ⇒ Object
Invoke a method asynchronously on an actor via its mailbox
37 38 39 40 |
# File 'lib/celluloid/actor.rb', line 37 def async(mailbox, meth, *args, &block) proxy = Proxy::Async.new(mailbox, "UnknownClass") proxy.method_missing(meth, *args, &block) end |
.call(mailbox, meth, *args, &block) ⇒ Object
Invoke a method on the given actor via its mailbox
31 32 33 34 |
# File 'lib/celluloid/actor.rb', line 31 def call(mailbox, meth, *args, &block) proxy = Proxy::Sync.new(mailbox, "UnknownClass") proxy.method_missing(meth, *args, &block) end |
.current ⇒ Object
Obtain the current actor
17 18 19 20 21 |
# File 'lib/celluloid/actor.rb', line 17 def current actor = Thread.current[:celluloid_actor] raise NotActorError, "not in actor scope" unless actor actor.behavior_proxy end |
.future(mailbox, meth, *args, &block) ⇒ Object
Call a method asynchronously and retrieve its value later
43 44 45 46 |
# File 'lib/celluloid/actor.rb', line 43 def future(mailbox, meth, *args, &block) proxy = Proxy::Future.new(mailbox, "UnknownClass") proxy.method_missing(meth, *args, &block) end |
.join(actor, timeout = nil) ⇒ Object
Wait for an actor to terminate
96 97 98 99 |
# File 'lib/celluloid/actor.rb', line 96 def join(actor, timeout = nil) actor.thread.join(timeout) actor end |
.link(actor) ⇒ Object
Link to another actor
66 67 68 69 |
# File 'lib/celluloid/actor.rb', line 66 def link(actor) monitor actor Thread.current[:celluloid_actor].links << actor end |
.linked_to?(actor) ⇒ Boolean
Are we bidirectionally linked to the given actor?
83 84 85 |
# File 'lib/celluloid/actor.rb', line 83 def linked_to?(actor) monitoring?(actor) && Thread.current[:celluloid_actor].links.include?(actor) end |
.monitor(actor) ⇒ Object
Watch for exit events from another actor
54 55 56 57 |
# File 'lib/celluloid/actor.rb', line 54 def monitor(actor) raise NotActorError, "can't link outside actor context" unless Celluloid.actor? Thread.current[:celluloid_actor].linking_request(actor, :link) end |
.monitoring?(actor) ⇒ Boolean
Are we monitoring the given actor?
78 79 80 |
# File 'lib/celluloid/actor.rb', line 78 def monitoring?(actor) actor.links.include? Actor.current end |
.registered_name ⇒ Object
Obtain the name of the current actor
24 25 26 27 28 |
# File 'lib/celluloid/actor.rb', line 24 def registered_name actor = Thread.current[:celluloid_actor] raise NotActorError, "not in actor scope" unless actor actor.name end |
.unlink(actor) ⇒ Object
Unlink from another actor
72 73 74 75 |
# File 'lib/celluloid/actor.rb', line 72 def unlink(actor) unmonitor actor Thread.current[:celluloid_actor].links.delete actor end |
.unmonitor(actor) ⇒ Object
Stop waiting for exit events from another actor
60 61 62 63 |
# File 'lib/celluloid/actor.rb', line 60 def unmonitor(actor) raise NotActorError, "can't link outside actor context" unless Celluloid.actor? Thread.current[:celluloid_actor].linking_request(actor, :unlink) end |
Instance Method Details
#after(interval, &block) ⇒ Object
Schedule a block to run at the given time
244 245 246 |
# File 'lib/celluloid/actor.rb', line 244 def after(interval, &block) @timers.after(interval) { task(:timer, &block) } end |
#behavior_proxy ⇒ Object
142 143 144 |
# File 'lib/celluloid/actor.rb', line 142 def behavior_proxy @behavior.proxy end |
#cleanup(exit_event) ⇒ Object
Clean up after this actor
320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 |
# File 'lib/celluloid/actor.rb', line 320 def cleanup(exit_event) # !!! DO NOT INTRODUCE ADDITIONAL GLOBAL VARIABLES !!! # rubocop:disable Style/GlobalVars Celluloid::Probe.actor_died(self) if $CELLULOID_MONITORING # rubocop:enable Style/GlobalVars @mailbox.shutdown @links.each do |actor| actor.mailbox << exit_event if actor.mailbox.alive? end tasks.to_a.each do |task| begin task.terminate rescue DeadTaskError # TODO: not tested (failed on Travis) end end rescue => ex # TODO: metadata Internals::Logger.crash("CLEANUP CRASHED!", ex) end |
#default_exit_handler(event) ⇒ Object
297 298 299 |
# File 'lib/celluloid/actor.rb', line 297 def default_exit_handler(event) raise event.reason if event.reason end |
#every(interval, &block) ⇒ Object
Schedule a block to run at the given time
249 250 251 |
# File 'lib/celluloid/actor.rb', line 249 def every(interval, &block) @timers.every(interval) { task(:timer, &block) } end |
#handle(*patterns, &block) ⇒ Object
Register a new handler for a given pattern
229 230 231 |
# File 'lib/celluloid/actor.rb', line 229 def handle(*patterns, &block) @handlers.handle(*patterns, &block) end |
#handle_crash(exception) ⇒ Object
Handle any exceptions that occur within a running actor
302 303 304 305 306 307 308 |
# File 'lib/celluloid/actor.rb', line 302 def handle_crash(exception) # TODO: add meta info Internals::Logger.crash("Actor crashed!", exception) shutdown ExitEvent.new(behavior_proxy, exception) rescue => ex Internals::Logger.crash("Actor#handle_crash CRASHED!", ex) end |
#handle_message(message) ⇒ Object
Handle standard low-priority messages
288 289 290 291 292 293 294 295 |
# File 'lib/celluloid/actor.rb', line 288 def () # !!! DO NOT INTRODUCE ADDITIONAL GLOBAL VARIABLES !!! # rubocop:disable Metrics/LineLength, Style/GlobalVars Internals::Logger.debug "Discarded message (unhandled): #{}" if !@handlers.() && !@receivers.() && $CELLULOID_DEBUG # rubocop:enable Metrics/LineLength, Style/GlobalVars end |
#handle_system_event(event) ⇒ Object
Handle high-priority system event messages
4 5 6 7 8 9 10 11 12 13 |
# File 'lib/celluloid/system_events.rb', line 4 def handle_system_event(event) if handler = SystemEvent.handle(event.class) send(handler, event) else # !!! DO NOT INTRODUCE ADDITIONAL GLOBAL VARIABLES !!! # rubocop:disable Style/GlobalVars Internals::Logger.debug "Discarded message (unhandled): #{}" if $CELLULOID_DEBUG # rubocop:enable Style/GlobalVars end end |
#kill(actor) ⇒ Object
Forcibly kill a given actor
89 90 91 92 |
# File 'lib/celluloid/actor.rb', line 89 def kill(actor) actor.thread.kill actor.mailbox.shutdown if actor.mailbox.alive? end |
#linking_request(receiver, type) ⇒ Object
Perform a linking request with another actor
184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 |
# File 'lib/celluloid/actor.rb', line 184 def linking_request(receiver, type) Celluloid.exclusive do receiver.mailbox << LinkingRequest.new(Actor.current, type) system_events = [] Timers::Wait.for(LINKING_TIMEOUT) do |remaining| begin = @mailbox.receive(remaining) do |msg| msg.is_a?(LinkingResponse) && msg.actor.mailbox.address == receiver.mailbox.address && msg.type == type end rescue TaskTimeout next # IO reactor did something, no message in queue yet. end if .instance_of? LinkingResponse # !!! DO NOT INTRODUCE ADDITIONAL GLOBAL VARIABLES !!! # rubocop:disable Style/GlobalVars Celluloid::Probe.actors_linked(self, receiver) if $CELLULOID_MONITORING # rubocop:enable Style/GlobalVars system_events.each { |ev| @mailbox << ev } return elsif .is_a? SystemEvent # Queue up pending system events to be processed after we've successfully linked system_events << else raise "Unexpected message type: #{.class}. Expected LinkingResponse, NilClass, SystemEvent." end end raise TaskTimeout, "linking timeout of #{LINKING_TIMEOUT} seconds exceeded with receiver: #{receiver}" end end |
#receive(timeout = nil, &block) ⇒ Object
Receive an asynchronous message
234 235 236 237 238 239 240 241 |
# File 'lib/celluloid/actor.rb', line 234 def receive(timeout = nil, &block) loop do = @receivers.receive(timeout, &block) return unless .is_a?(SystemEvent) handle_system_event() end end |
#run ⇒ Object
Run the actor loop
152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 |
# File 'lib/celluloid/actor.rb', line 152 def run while @running begin @timers.wait do |interval| interval = 0 if interval && interval < 0 if = @mailbox.check(interval) () break unless @running end end rescue MailboxShutdown @running = false rescue MailboxDead # TODO: not tests (but fails occasionally in tests) @running = false end end shutdown rescue ::Exception => ex handle_crash(ex) raise unless ex.is_a?(StandardError) || ex.is_a?(Celluloid::Interruption) end |
#setup_thread ⇒ Object
146 147 148 149 |
# File 'lib/celluloid/actor.rb', line 146 def setup_thread Thread.current[:celluloid_actor] = self Thread.current[:celluloid_mailbox] = @mailbox end |
#shutdown(exit_event = ExitEvent.new(behavior_proxy)) ⇒ Object
Handle cleaning up this actor after it exits
311 312 313 314 315 316 317 |
# File 'lib/celluloid/actor.rb', line 311 def shutdown(exit_event = ExitEvent.new(behavior_proxy)) @behavior.shutdown cleanup exit_event ensure Thread.current[:celluloid_actor] = nil Thread.current[:celluloid_mailbox] = nil end |
#signal(name, value = nil) ⇒ Object
Send a signal with the given name to all waiting methods
219 220 221 |
# File 'lib/celluloid/actor.rb', line 219 def signal(name, value = nil) @signals.broadcast name, value end |
#sleep(interval) ⇒ Object
Sleep for the given amount of time
282 283 284 285 |
# File 'lib/celluloid/actor.rb', line 282 def sleep(interval) sleeper = Sleeper.new(@timers, interval) Celluloid.suspend(:sleeping, sleeper) end |
#start ⇒ Object
127 128 129 130 131 132 133 134 135 136 137 138 139 140 |
# File 'lib/celluloid/actor.rb', line 127 def start @running = true @thread = Internals::ThreadHandle.new(@actor_system, :actor) do setup_thread run end @proxy = Proxy::Actor.new(@mailbox, @thread) # !!! DO NOT INTRODUCE ADDITIONAL GLOBAL VARIABLES !!! # rubocop:disable Style/GlobalVars Celluloid::Probe.actor_created(self) if $CELLULOID_MONITORING # rubocop:enable Style/GlobalVars end |
#task(task_type, meta = nil) ⇒ Object
Run a method inside a task unless it’s exclusive
344 345 346 347 348 349 350 351 352 |
# File 'lib/celluloid/actor.rb', line 344 def task(task_type, = nil) @task_class.new(task_type, ) do if @exclusive Celluloid.exclusive { yield } else yield end end.resume end |
#terminate ⇒ Object
Terminate this actor
179 180 181 |
# File 'lib/celluloid/actor.rb', line 179 def terminate @running = false end |
#timeout(duration) ⇒ Object
253 254 255 256 257 258 259 260 261 262 263 264 |
# File 'lib/celluloid/actor.rb', line 253 def timeout(duration) bt = caller task = Task.current timer = @timers.after(duration) do exception = TaskTimeout.new("execution expired") exception.set_backtrace bt task.resume exception end yield ensure timer.cancel if timer end |
#wait(name) ⇒ Object
Wait for the given signal
224 225 226 |
# File 'lib/celluloid/actor.rb', line 224 def wait(name) @signals.wait name end |