Module: Celluloid
- Extended by:
- Celluloid, Forwardable
- Included in:
- Celluloid, IncidentReporter, Notifications::Fanout, Probe, Supervision::Container, Supervision::Container::Pool
- Defined in:
- lib/celluloid/probe.rb,
lib/celluloid.rb,
lib/celluloid/cell.rb,
lib/celluloid/task.rb,
lib/celluloid/actor.rb,
lib/celluloid/calls.rb,
lib/celluloid/group.rb,
lib/celluloid/future.rb,
lib/celluloid/thread.rb,
lib/celluloid/mailbox.rb,
lib/celluloid/proxies.rb,
lib/celluloid/version.rb,
lib/celluloid/call/sync.rb,
lib/celluloid/condition.rb,
lib/celluloid/call/async.rb,
lib/celluloid/call/block.rb,
lib/celluloid/exceptions.rb,
lib/celluloid/group/pool.rb,
lib/celluloid/actor/system.rb,
lib/celluloid/task/fibered.rb,
lib/celluloid/group/spawner.rb,
lib/celluloid/notifications.rb,
lib/celluloid/system_events.rb,
lib/celluloid/task/threaded.rb,
lib/celluloid/internals/uuid.rb,
lib/celluloid/internals/links.rb,
lib/celluloid/internals/stack.rb,
lib/celluloid/mailbox/evented.rb,
lib/celluloid/internals/logger.rb,
lib/celluloid/internals/method.rb,
lib/celluloid/logging/incident.rb,
lib/celluloid/internals/signals.rb,
lib/celluloid/logging/log_event.rb,
lib/celluloid/internals/handlers.rb,
lib/celluloid/internals/registry.rb,
lib/celluloid/internals/task_set.rb,
lib/celluloid/internals/receivers.rb,
lib/celluloid/internals/responses.rb,
lib/celluloid/logging/ring_buffer.rb,
lib/celluloid/supervision/service.rb,
lib/celluloid/supervision/version.rb,
lib/celluloid/internals/call_chain.rb,
lib/celluloid/internals/properties.rb,
lib/celluloid/internals/stack/dump.rb,
lib/celluloid/internals/cpu_counter.rb,
lib/celluloid/supervision/constants.rb,
lib/celluloid/supervision/container.rb,
lib/celluloid/supervision/supervise.rb,
lib/celluloid/internals/stack/states.rb,
lib/celluloid/supervision/validation.rb,
lib/celluloid/internals/stack/summary.rb,
lib/celluloid/internals/thread_handle.rb,
lib/celluloid/logging/incident_logger.rb,
lib/celluloid/logging/incident_reporter.rb,
lib/celluloid/supervision/configuration.rb,
lib/celluloid/supervision/container/pool.rb,
lib/celluloid/supervision/container/behavior.rb,
lib/celluloid/supervision/container/instance.rb,
lib/celluloid/supervision/container/injections.rb,
lib/celluloid/supervision/configuration/instance.rb,
lib/celluloid/supervision/container/behavior/pool.rb,
lib/celluloid/supervision/container/behavior/tree.rb,
lib/celluloid/supervision/configuration/injections.rb
Overview
collect together all instances of the ‘supervise` method
Defined Under Namespace
Modules: ClassMethods, Feature, InstanceMethods, Internals, Notifications, Proxy, Supervision Classes: AbortError, Actor, Call, Cell, Condition, ConditionError, DeadActorError, DeadTaskError, Error, ExitEvent, Future, Group, Incident, IncidentLogger, IncidentReporter, Interruption, LinkingRequest, LinkingResponse, LogEvent, Mailbox, MailboxDead, MailboxShutdown, NamingRequest, NotActive, NotActorError, NotTaskError, Probe, RingBuffer, SignalConditionRequest, StillActive, SystemEvent, Task, TaskTerminated, TaskTimeout, TerminationRequest, Thread, ThreadLeak, TimedOut
Constant Summary collapse
- LINKING_TIMEOUT =
Linking times out after 5 seconds
5
- BARE_OBJECT_WARNING_MESSAGE =
Warning message added to Celluloid objects accessed outside their actors
"WARNING: BARE CELLULOID OBJECT ".freeze
- OWNER_IVAR =
reference to owning actor
:@celluloid_owner
- VERSION =
"0.18.0".freeze
Class Attribute Summary collapse
- .actor_system ⇒ Object
-
.group_class ⇒ Object
Default internal thread group to use.
-
.log_actor_crashes ⇒ Object
Returns the value of attribute log_actor_crashes.
-
.logger ⇒ Object
Thread-safe logger class.
-
.shutdown_timeout ⇒ Object
How long actors have to terminate.
-
.task_class ⇒ Object
Default task type to use.
Class Method Summary collapse
-
.actor? ⇒ Boolean
Are we currently inside of an actor?.
- .boot ⇒ Object
-
.cores ⇒ Object
(also: cpus, ncpus)
Obtain the number of CPUs in the system.
-
.detect_recursion ⇒ Object
Detect if a particular call is recursing through multiple actors.
-
.exception_handler(&block) ⇒ Object
Define an exception handler for actor crashes.
- .included(klass) ⇒ Object
- .init ⇒ Object
-
.mailbox ⇒ Object
Retrieve the mailbox for the current thread or lazily initialize it.
- .public_registry ⇒ Object
- .publish(*args) ⇒ Object
-
.register_shutdown ⇒ Object
de TODO Anticipate outside process finalizer that would by-pass this.
- .running? ⇒ Boolean
-
.shutdown ⇒ Object
Shut down all running actors.
-
.stack_dump(output = STDERR) ⇒ Object
(also: dump)
Perform a stack dump of all actors to the given output object.
-
.stack_summary(output = STDERR) ⇒ Object
(also: summarize)
Perform a stack summary of all actors to the given output object.
- .start ⇒ Object
- .supervise(config = {}, &block) ⇒ Object
- .suspend(status, waiter) ⇒ Object
-
.uuid ⇒ Object
Generate a Universally Unique Identifier.
- .version ⇒ Object
Instance Method Summary collapse
-
#abort(cause) ⇒ Object
Raise an exception in sender context, but stay running.
-
#after(interval, &block) ⇒ Object
Call a block after a given interval, returning a Celluloid::Timer object.
-
#async(meth = nil, *args, &block) ⇒ Object
Handle async calls within an actor itself.
-
#call_chain_id ⇒ Object
Obtain the UUID of the current call chain.
-
#current_actor ⇒ Object
Obtain the current_actor.
-
#defer(&block) ⇒ Object
Perform a blocking or computationally intensive action inside an asynchronous group of threads, allowing the sender to continue processing other messages in its mailbox in the meantime.
-
#every(interval, &block) ⇒ Object
Call a block every given interval, returning a Celluloid::Timer object.
-
#exclusive(&block) ⇒ Object
Run given block in an exclusive mode: all synchronous calls block the whole actor, not only current message processing.
-
#exclusive? ⇒ Boolean
Are we currently exclusive.
-
#future(meth = nil, *args, &block) ⇒ Object
Handle calls to future within an actor itself.
-
#link(actor) ⇒ Object
Link this actor to another, allowing it to crash or react to errors.
-
#linked_to?(actor) ⇒ Boolean
Is this actor linked to another?.
-
#links ⇒ Object
Obtain the Celluloid::Links for this actor.
-
#monitor(actor) ⇒ Object
Watch for exit events from another actor.
-
#monitoring?(actor) ⇒ Boolean
Are we monitoring another actor?.
-
#receive(timeout = nil, &block) ⇒ Object
Receive an asynchronous message via the actor protocol.
-
#signal(name, value = nil) ⇒ Object
Send a signal with the given name to all waiting methods.
-
#sleep(interval) ⇒ Object
Sleep letting the actor continue processing messages.
-
#tasks ⇒ Object
Obtain the running tasks for this actor.
-
#terminate ⇒ Object
Terminate this actor.
-
#timeout(duration) ⇒ Object
Timeout on task suspension (eg Sync calls to other actors).
-
#unlink(actor) ⇒ Object
Remove links to another actor.
-
#unmonitor(actor) ⇒ Object
Stop waiting for exit events from another actor.
-
#wait(name) ⇒ Object
Wait for the given signal.
Class Attribute Details
.actor_system ⇒ Object
34 35 36 37 38 39 40 41 42 |
# File 'lib/celluloid.rb', line 34 def actor_system if Thread.current.celluloid? Thread.current[:celluloid_actor_system] || raise(Error, "actor system not running") else Thread.current[:celluloid_actor_system] || @actor_system || raise(Error, "Celluloid is not yet started; use Celluloid.boot") end end |
.group_class ⇒ Object
Default internal thread group to use
30 31 32 |
# File 'lib/celluloid.rb', line 30 def group_class @group_class end |
.log_actor_crashes ⇒ Object
Returns the value of attribute log_actor_crashes.
29 30 31 |
# File 'lib/celluloid.rb', line 29 def log_actor_crashes @log_actor_crashes end |
.logger ⇒ Object
Thread-safe logger class
28 29 30 |
# File 'lib/celluloid.rb', line 28 def logger @logger end |
.shutdown_timeout ⇒ Object
How long actors have to terminate
32 33 34 |
# File 'lib/celluloid.rb', line 32 def shutdown_timeout @shutdown_timeout end |
.task_class ⇒ Object
Default task type to use
31 32 33 |
# File 'lib/celluloid.rb', line 31 def task_class @task_class end |
Class Method Details
.actor? ⇒ Boolean
Are we currently inside of an actor?
91 92 93 |
# File 'lib/celluloid.rb', line 91 def actor? !!Thread.current[:celluloid_actor] end |
.boot ⇒ Object
155 156 157 158 |
# File 'lib/celluloid.rb', line 155 def boot init start end |
.cores ⇒ Object Also known as: cpus, ncpus
Obtain the number of CPUs in the system
106 107 108 |
# File 'lib/celluloid.rb', line 106 def cores Internals::CPUCounter.cores end |
.detect_recursion ⇒ Object
Detect if a particular call is recursing through multiple actors
129 130 131 132 133 134 135 136 137 138 |
# File 'lib/celluloid.rb', line 129 def detect_recursion actor = Thread.current[:celluloid_actor] return unless actor task = Thread.current[:celluloid_task] return unless task chain_id = Internals::CallChain.current_id actor.tasks.to_a.any? { |t| t != task && t.chain_id == chain_id } end |
.exception_handler(&block) ⇒ Object
Define an exception handler for actor crashes
141 142 143 |
# File 'lib/celluloid.rb', line 141 def exception_handler(&block) Internals::Logger.exception_handler(&block) end |
.included(klass) ⇒ Object
44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 |
# File 'lib/celluloid.rb', line 44 def included(klass) klass.send :extend, ClassMethods klass.send :include, InstanceMethods klass.send :extend, Internals::Properties klass.property :mailbox_class, default: Celluloid::Mailbox klass.property :proxy_class, default: Celluloid::Proxy::Cell klass.property :task_class, default: Celluloid.task_class klass.property :group_class, default: Celluloid.group_class klass.property :mailbox_size klass.property :exclusive_actor, default: false klass.property :exclusive_methods, multi: true klass.property :execute_block_on_receiver, default: %i[after every receive], multi: true klass.property :finalizer klass.property :exit_handler_name singleton = class << klass; self; end begin singleton.send(:remove_method, :trap_exit) rescue nil end begin singleton.send(:remove_method, :exclusive) rescue nil end singleton.send(:define_method, :trap_exit) do |*args| exit_handler_name(*args) end singleton.send(:define_method, :exclusive) do |*args| if args.any? exclusive_methods(*exclusive_methods, *args) else exclusive_actor true end end end |
.init ⇒ Object
160 161 162 |
# File 'lib/celluloid.rb', line 160 def init @actor_system ||= Actor::System.new end |
.mailbox ⇒ Object
Retrieve the mailbox for the current thread or lazily initialize it
96 97 98 |
# File 'lib/celluloid.rb', line 96 def mailbox Thread.current[:celluloid_mailbox] ||= Celluloid::Mailbox.new end |
.public_registry ⇒ Object
124 125 126 |
# File 'lib/celluloid.rb', line 124 def public_registry actor_system.public_registry end |
.publish(*args) ⇒ Object
92 93 94 |
# File 'lib/celluloid/notifications.rb', line 92 def self.publish(*args) Notifications.publish(*args) end |
.register_shutdown ⇒ Object
de TODO Anticipate outside process finalizer that would by-pass this.
175 176 177 178 179 180 181 182 |
# File 'lib/celluloid.rb', line 175 def register_shutdown return if defined?(@shutdown_registered) && @shutdown_registered # Terminate all actors at exit, unless the exit is abnormal. at_exit do Celluloid.shutdown unless $ERROR_INFO end @shutdown_registered = true end |
.running? ⇒ Boolean
168 169 170 171 172 |
# File 'lib/celluloid.rb', line 168 def running? actor_system && actor_system.running? rescue Error false end |
.shutdown ⇒ Object
Shut down all running actors
185 186 187 188 |
# File 'lib/celluloid.rb', line 185 def shutdown actor_system.shutdown @actor_system = nil end |
.stack_dump(output = STDERR) ⇒ Object Also known as: dump
Perform a stack dump of all actors to the given output object
113 114 115 |
# File 'lib/celluloid.rb', line 113 def stack_dump(output = STDERR) actor_system.stack_dump.print(output) end |
.stack_summary(output = STDERR) ⇒ Object Also known as: summarize
Perform a stack summary of all actors to the given output object
119 120 121 |
# File 'lib/celluloid.rb', line 119 def stack_summary(output = STDERR) actor_system.stack_summary.print(output) end |
.start ⇒ Object
164 165 166 |
# File 'lib/celluloid.rb', line 164 def start actor_system.start end |
.supervise(config = {}, &block) ⇒ Object
4 5 6 7 |
# File 'lib/celluloid/supervision/supervise.rb', line 4 def supervise(config = {}, &block) supervisor = Supervision.router(config) supervisor.supervise(config, &block) end |
.suspend(status, waiter) ⇒ Object
145 146 147 148 149 150 151 152 153 |
# File 'lib/celluloid.rb', line 145 def suspend(status, waiter) task = Thread.current[:celluloid_task] if task && !Celluloid.exclusive? waiter.before_suspend(task) if waiter.respond_to?(:before_suspend) Task.suspend(status) else waiter.wait end end |
.uuid ⇒ Object
Generate a Universally Unique Identifier
101 102 103 |
# File 'lib/celluloid.rb', line 101 def uuid Internals::UUID.generate end |
.version ⇒ Object
190 191 192 |
# File 'lib/celluloid.rb', line 190 def version VERSION end |
Instance Method Details
#abort(cause) ⇒ Object
Raise an exception in sender context, but stay running
320 321 322 323 324 325 326 327 328 |
# File 'lib/celluloid.rb', line 320 def abort(cause) cause = case cause when String then RuntimeError.new(cause) when Exception then cause else raise TypeError, "Exception object/String expected, but #{cause.class} received" end raise AbortError, cause end |
#after(interval, &block) ⇒ Object
Call a block after a given interval, returning a Celluloid::Timer object
435 436 437 |
# File 'lib/celluloid.rb', line 435 def after(interval, &block) Thread.current[:celluloid_actor].after(interval, &block) end |
#async(meth = nil, *args, &block) ⇒ Object
Handle async calls within an actor itself
454 455 456 |
# File 'lib/celluloid.rb', line 454 def async(meth = nil, *args, &block) Thread.current[:celluloid_actor].behavior_proxy.async meth, *args, &block end |
#call_chain_id ⇒ Object
Obtain the UUID of the current call chain
351 352 353 |
# File 'lib/celluloid.rb', line 351 def call_chain_id Internals::CallChain.current_id end |
#current_actor ⇒ Object
Obtain the current_actor
346 347 348 |
# File 'lib/celluloid.rb', line 346 def current_actor Actor.current end |
#defer(&block) ⇒ Object
Perform a blocking or computationally intensive action inside an asynchronous group of threads, allowing the sender to continue processing other messages in its mailbox in the meantime
447 448 449 450 451 |
# File 'lib/celluloid.rb', line 447 def defer(&block) # This implementation relies on the present implementation of # Celluloid::Future, which uses a thread from InternalPool to run the block Future.new(&block).value end |
#every(interval, &block) ⇒ Object
Call a block every given interval, returning a Celluloid::Timer object
440 441 442 |
# File 'lib/celluloid.rb', line 440 def every(interval, &block) Thread.current[:celluloid_actor].every(interval, &block) end |
#exclusive(&block) ⇒ Object
Run given block in an exclusive mode: all synchronous calls block the whole actor, not only current message processing.
424 425 426 |
# File 'lib/celluloid.rb', line 424 def exclusive(&block) Thread.current[:celluloid_task].exclusive(&block) end |
#exclusive? ⇒ Boolean
Are we currently exclusive
429 430 431 432 |
# File 'lib/celluloid.rb', line 429 def exclusive? task = Thread.current[:celluloid_task] task && task.exclusive? end |
#future(meth = nil, *args, &block) ⇒ Object
Handle calls to future within an actor itself
459 460 461 |
# File 'lib/celluloid.rb', line 459 def future(meth = nil, *args, &block) Thread.current[:celluloid_actor].behavior_proxy.future meth, *args, &block end |
#link(actor) ⇒ Object
Link this actor to another, allowing it to crash or react to errors
376 377 378 |
# File 'lib/celluloid.rb', line 376 def link(actor) Actor.link(actor) end |
#linked_to?(actor) ⇒ Boolean
Is this actor linked to another?
391 392 393 |
# File 'lib/celluloid.rb', line 391 def linked_to?(actor) Actor.linked_to?(actor) end |
#links ⇒ Object
Obtain the Celluloid::Links for this actor
361 362 363 |
# File 'lib/celluloid.rb', line 361 def links Thread.current[:celluloid_actor].links end |
#monitor(actor) ⇒ Object
Watch for exit events from another actor
366 367 368 |
# File 'lib/celluloid.rb', line 366 def monitor(actor) Actor.monitor(actor) end |
#monitoring?(actor) ⇒ Boolean
Are we monitoring another actor?
386 387 388 |
# File 'lib/celluloid.rb', line 386 def monitoring?(actor) Actor.monitoring?(actor) end |
#receive(timeout = nil, &block) ⇒ Object
Receive an asynchronous message via the actor protocol
396 397 398 399 400 401 402 403 |
# File 'lib/celluloid.rb', line 396 def receive(timeout = nil, &block) actor = Thread.current[:celluloid_actor] if actor actor.receive(timeout, &block) else Celluloid.mailbox.receive(timeout, &block) end end |
#signal(name, value = nil) ⇒ Object
Send a signal with the given name to all waiting methods
336 337 338 |
# File 'lib/celluloid.rb', line 336 def signal(name, value = nil) Thread.current[:celluloid_actor].signal name, value end |
#sleep(interval) ⇒ Object
Sleep letting the actor continue processing messages
406 407 408 409 410 411 412 413 |
# File 'lib/celluloid.rb', line 406 def sleep(interval) actor = Thread.current[:celluloid_actor] if actor actor.sleep(interval) else Kernel.sleep interval end end |
#tasks ⇒ Object
Obtain the running tasks for this actor
356 357 358 |
# File 'lib/celluloid.rb', line 356 def tasks Thread.current[:celluloid_actor].tasks.to_a end |
#terminate ⇒ Object
Terminate this actor
331 332 333 |
# File 'lib/celluloid.rb', line 331 def terminate Thread.current[:celluloid_actor].behavior_proxy.terminate! end |
#timeout(duration) ⇒ Object
Timeout on task suspension (eg Sync calls to other actors)
416 417 418 419 420 |
# File 'lib/celluloid.rb', line 416 def timeout(duration) Thread.current[:celluloid_actor].timeout(duration) do yield end end |
#unlink(actor) ⇒ Object
Remove links to another actor
381 382 383 |
# File 'lib/celluloid.rb', line 381 def unlink(actor) Actor.unlink(actor) end |
#unmonitor(actor) ⇒ Object
Stop waiting for exit events from another actor
371 372 373 |
# File 'lib/celluloid.rb', line 371 def unmonitor(actor) Actor.unmonitor(actor) end |
#wait(name) ⇒ Object
Wait for the given signal
341 342 343 |
# File 'lib/celluloid.rb', line 341 def wait(name) Thread.current[:celluloid_actor].wait name end |