Module: Celluloid

Extended by:
Celluloid, Forwardable
Included in:
Celluloid, IncidentReporter, Notifications::Fanout, Probe, Supervision::Container, Supervision::Container::Pool
Defined in:
lib/celluloid/probe.rb,
lib/celluloid.rb,
lib/celluloid/cell.rb,
lib/celluloid/task.rb,
lib/celluloid/actor.rb,
lib/celluloid/calls.rb,
lib/celluloid/group.rb,
lib/celluloid/future.rb,
lib/celluloid/thread.rb,
lib/celluloid/mailbox.rb,
lib/celluloid/proxies.rb,
lib/celluloid/version.rb,
lib/celluloid/call/sync.rb,
lib/celluloid/condition.rb,
lib/celluloid/call/async.rb,
lib/celluloid/call/block.rb,
lib/celluloid/exceptions.rb,
lib/celluloid/group/pool.rb,
lib/celluloid/actor/system.rb,
lib/celluloid/task/fibered.rb,
lib/celluloid/group/spawner.rb,
lib/celluloid/notifications.rb,
lib/celluloid/system_events.rb,
lib/celluloid/task/threaded.rb,
lib/celluloid/internals/uuid.rb,
lib/celluloid/internals/links.rb,
lib/celluloid/internals/stack.rb,
lib/celluloid/mailbox/evented.rb,
lib/celluloid/internals/logger.rb,
lib/celluloid/internals/method.rb,
lib/celluloid/logging/incident.rb,
lib/celluloid/internals/signals.rb,
lib/celluloid/logging/log_event.rb,
lib/celluloid/internals/handlers.rb,
lib/celluloid/internals/registry.rb,
lib/celluloid/internals/task_set.rb,
lib/celluloid/internals/receivers.rb,
lib/celluloid/internals/responses.rb,
lib/celluloid/logging/ring_buffer.rb,
lib/celluloid/supervision/service.rb,
lib/celluloid/supervision/version.rb,
lib/celluloid/internals/call_chain.rb,
lib/celluloid/internals/properties.rb,
lib/celluloid/internals/stack/dump.rb,
lib/celluloid/internals/cpu_counter.rb,
lib/celluloid/supervision/constants.rb,
lib/celluloid/supervision/container.rb,
lib/celluloid/supervision/supervise.rb,
lib/celluloid/internals/stack/states.rb,
lib/celluloid/supervision/validation.rb,
lib/celluloid/internals/stack/summary.rb,
lib/celluloid/internals/thread_handle.rb,
lib/celluloid/logging/incident_logger.rb,
lib/celluloid/logging/incident_reporter.rb,
lib/celluloid/supervision/configuration.rb,
lib/celluloid/supervision/container/pool.rb,
lib/celluloid/supervision/container/behavior.rb,
lib/celluloid/supervision/container/instance.rb,
lib/celluloid/supervision/container/injections.rb,
lib/celluloid/supervision/configuration/instance.rb,
lib/celluloid/supervision/container/behavior/pool.rb,
lib/celluloid/supervision/container/behavior/tree.rb,
lib/celluloid/supervision/configuration/injections.rb

Overview

collect together all instances of the `supervise` method

Defined Under Namespace

Modules: ClassMethods, Feature, InstanceMethods, Internals, Notifications, Proxy, Supervision Classes: AbortError, Actor, Call, Cell, Condition, ConditionError, DeadActorError, DeadTaskError, Error, ExitEvent, Future, Group, Incident, IncidentLogger, IncidentReporter, Interruption, LinkingRequest, LinkingResponse, LogEvent, Mailbox, MailboxDead, MailboxShutdown, NamingRequest, NotActive, NotActorError, NotTaskError, Probe, RingBuffer, SignalConditionRequest, StillActive, SystemEvent, Task, TaskTerminated, TaskTimeout, TerminationRequest, Thread, ThreadLeak, TimedOut

Constant Summary collapse

LINKING_TIMEOUT =

Linking times out after 5 seconds

5
BARE_OBJECT_WARNING_MESSAGE =

Warning message added to Celluloid objects accessed outside their actors

"WARNING: BARE CELLULOID OBJECT ".freeze
OWNER_IVAR =

reference to owning actor

:@celluloid_owner
VERSION =
"0.18.0".freeze

Class Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Class Attribute Details

.actor_systemObject


34
35
36
37
38
39
40
41
42
# File 'lib/celluloid.rb', line 34

def actor_system
  if Thread.current.celluloid?
    Thread.current[:celluloid_actor_system] || raise(Error, "actor system not running")
  else
    Thread.current[:celluloid_actor_system] ||
      @actor_system ||
      raise(Error, "Celluloid is not yet started; use Celluloid.boot")
  end
end

.group_classObject

Default internal thread group to use


30
31
32
# File 'lib/celluloid.rb', line 30

def group_class
  @group_class
end

.log_actor_crashesObject

Returns the value of attribute log_actor_crashes


29
30
31
# File 'lib/celluloid.rb', line 29

def log_actor_crashes
  @log_actor_crashes
end

.loggerObject

Thread-safe logger class


28
29
30
# File 'lib/celluloid.rb', line 28

def logger
  @logger
end

.shutdown_timeoutObject

How long actors have to terminate


32
33
34
# File 'lib/celluloid.rb', line 32

def shutdown_timeout
  @shutdown_timeout
end

.task_classObject

Default task type to use


31
32
33
# File 'lib/celluloid.rb', line 31

def task_class
  @task_class
end

Class Method Details

.actor?Boolean

Are we currently inside of an actor?

Returns:

  • (Boolean)

91
92
93
# File 'lib/celluloid.rb', line 91

def actor?
  !!Thread.current[:celluloid_actor]
end

.bootObject


155
156
157
158
# File 'lib/celluloid.rb', line 155

def boot
  init
  start
end

.coresObject Also known as: cpus, ncpus

Obtain the number of CPUs in the system


106
107
108
# File 'lib/celluloid.rb', line 106

def cores
  Internals::CPUCounter.cores
end

.detect_recursionObject

Detect if a particular call is recursing through multiple actors


129
130
131
132
133
134
135
136
137
138
# File 'lib/celluloid.rb', line 129

def detect_recursion
  actor = Thread.current[:celluloid_actor]
  return unless actor

  task = Thread.current[:celluloid_task]
  return unless task

  chain_id = Internals::CallChain.current_id
  actor.tasks.to_a.any? { |t| t != task && t.chain_id == chain_id }
end

.exception_handler(&block) ⇒ Object

Define an exception handler for actor crashes


141
142
143
# File 'lib/celluloid.rb', line 141

def exception_handler(&block)
  Internals::Logger.exception_handler(&block)
end

.included(klass) ⇒ Object


44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
# File 'lib/celluloid.rb', line 44

def included(klass)
  klass.send :extend,  ClassMethods
  klass.send :include, InstanceMethods

  klass.send :extend, Internals::Properties

  klass.property :mailbox_class, default: Celluloid::Mailbox
  klass.property :proxy_class,   default: Celluloid::Proxy::Cell
  klass.property :task_class,    default: Celluloid.task_class
  klass.property :group_class,   default: Celluloid.group_class
  klass.property :mailbox_size

  klass.property :exclusive_actor, default: false
  klass.property :exclusive_methods, multi: true
  klass.property :execute_block_on_receiver,
                 default: %i[after every receive],
                 multi: true

  klass.property :finalizer
  klass.property :exit_handler_name

  singleton = class << klass; self; end
  begin
    singleton.send(:remove_method, :trap_exit)
  rescue
    nil
  end
  begin
    singleton.send(:remove_method, :exclusive)
  rescue
    nil
  end

  singleton.send(:define_method, :trap_exit) do |*args|
    exit_handler_name(*args)
  end

  singleton.send(:define_method, :exclusive) do |*args|
    if args.any?
      exclusive_methods(*exclusive_methods, *args)
    else
      exclusive_actor true
    end
  end
end

.initObject


160
161
162
# File 'lib/celluloid.rb', line 160

def init
  @actor_system ||= Actor::System.new
end

.mailboxObject

Retrieve the mailbox for the current thread or lazily initialize it


96
97
98
# File 'lib/celluloid.rb', line 96

def mailbox
  Thread.current[:celluloid_mailbox] ||= Celluloid::Mailbox.new
end

.public_registryObject


124
125
126
# File 'lib/celluloid.rb', line 124

def public_registry
  actor_system.public_registry
end

.publish(*args) ⇒ Object


92
93
94
# File 'lib/celluloid/notifications.rb', line 92

def self.publish(*args)
  Notifications.publish(*args)
end

.register_shutdownObject

de TODO Anticipate outside process finalizer that would by-pass this.


175
176
177
178
179
180
181
182
# File 'lib/celluloid.rb', line 175

def register_shutdown
  return if defined?(@shutdown_registered) && @shutdown_registered
  # Terminate all actors at exit, unless the exit is abnormal.
  at_exit do
    Celluloid.shutdown unless $ERROR_INFO
  end
  @shutdown_registered = true
end

.running?Boolean

Returns:

  • (Boolean)

168
169
170
171
172
# File 'lib/celluloid.rb', line 168

def running?
  actor_system && actor_system.running?
rescue Error
  false
end

.shutdownObject

Shut down all running actors


185
186
187
188
# File 'lib/celluloid.rb', line 185

def shutdown
  actor_system.shutdown
  @actor_system = nil
end

.stack_dump(output = STDERR) ⇒ Object Also known as: dump

Perform a stack dump of all actors to the given output object


113
114
115
# File 'lib/celluloid.rb', line 113

def stack_dump(output = STDERR)
  actor_system.stack_dump.print(output)
end

.stack_summary(output = STDERR) ⇒ Object Also known as: summarize

Perform a stack summary of all actors to the given output object


119
120
121
# File 'lib/celluloid.rb', line 119

def stack_summary(output = STDERR)
  actor_system.stack_summary.print(output)
end

.startObject


164
165
166
# File 'lib/celluloid.rb', line 164

def start
  actor_system.start
end

.supervise(config = {}, &block) ⇒ Object


4
5
6
7
# File 'lib/celluloid/supervision/supervise.rb', line 4

def supervise(config = {}, &block)
  supervisor = Supervision.router(config)
  supervisor.supervise(config, &block)
end

.suspend(status, waiter) ⇒ Object


145
146
147
148
149
150
151
152
153
# File 'lib/celluloid.rb', line 145

def suspend(status, waiter)
  task = Thread.current[:celluloid_task]
  if task && !Celluloid.exclusive?
    waiter.before_suspend(task) if waiter.respond_to?(:before_suspend)
    Task.suspend(status)
  else
    waiter.wait
  end
end

.uuidObject

Generate a Universally Unique Identifier


101
102
103
# File 'lib/celluloid.rb', line 101

def uuid
  Internals::UUID.generate
end

.versionObject


190
191
192
# File 'lib/celluloid.rb', line 190

def version
  VERSION
end

Instance Method Details

#abort(cause) ⇒ Object

Raise an exception in sender context, but stay running

Raises:


320
321
322
323
324
325
326
327
328
# File 'lib/celluloid.rb', line 320

def abort(cause)
  cause = case cause
          when String then RuntimeError.new(cause)
          when Exception then cause
          else raise TypeError, "Exception object/String expected, but #{cause.class} received"
          end

  raise AbortError, cause
end

#after(interval, &block) ⇒ Object

Call a block after a given interval, returning a Celluloid::Timer object


435
436
437
# File 'lib/celluloid.rb', line 435

def after(interval, &block)
  Thread.current[:celluloid_actor].after(interval, &block)
end

#async(meth = nil, *args, &block) ⇒ Object

Handle async calls within an actor itself


454
455
456
# File 'lib/celluloid.rb', line 454

def async(meth = nil, *args, &block)
  Thread.current[:celluloid_actor].behavior_proxy.async meth, *args, &block
end

#call_chain_idObject

Obtain the UUID of the current call chain


351
352
353
# File 'lib/celluloid.rb', line 351

def call_chain_id
  Internals::CallChain.current_id
end

#current_actorObject

Obtain the current_actor


346
347
348
# File 'lib/celluloid.rb', line 346

def current_actor
  Actor.current
end

#defer(&block) ⇒ Object

Perform a blocking or computationally intensive action inside an asynchronous group of threads, allowing the sender to continue processing other messages in its mailbox in the meantime


447
448
449
450
451
# File 'lib/celluloid.rb', line 447

def defer(&block)
  # This implementation relies on the present implementation of
  # Celluloid::Future, which uses a thread from InternalPool to run the block
  Future.new(&block).value
end

#every(interval, &block) ⇒ Object

Call a block every given interval, returning a Celluloid::Timer object


440
441
442
# File 'lib/celluloid.rb', line 440

def every(interval, &block)
  Thread.current[:celluloid_actor].every(interval, &block)
end

#exclusive(&block) ⇒ Object

Run given block in an exclusive mode: all synchronous calls block the whole actor, not only current message processing.


424
425
426
# File 'lib/celluloid.rb', line 424

def exclusive(&block)
  Thread.current[:celluloid_task].exclusive(&block)
end

#exclusive?Boolean

Are we currently exclusive

Returns:

  • (Boolean)

429
430
431
432
# File 'lib/celluloid.rb', line 429

def exclusive?
  task = Thread.current[:celluloid_task]
  task && task.exclusive?
end

#future(meth = nil, *args, &block) ⇒ Object

Handle calls to future within an actor itself


459
460
461
# File 'lib/celluloid.rb', line 459

def future(meth = nil, *args, &block)
  Thread.current[:celluloid_actor].behavior_proxy.future meth, *args, &block
end

Link this actor to another, allowing it to crash or react to errors


376
377
378
# File 'lib/celluloid.rb', line 376

def link(actor)
  Actor.link(actor)
end

#linked_to?(actor) ⇒ Boolean

Is this actor linked to another?

Returns:

  • (Boolean)

391
392
393
# File 'lib/celluloid.rb', line 391

def linked_to?(actor)
  Actor.linked_to?(actor)
end

Obtain the Celluloid::Links for this actor


361
362
363
# File 'lib/celluloid.rb', line 361

def links
  Thread.current[:celluloid_actor].links
end

#monitor(actor) ⇒ Object

Watch for exit events from another actor


366
367
368
# File 'lib/celluloid.rb', line 366

def monitor(actor)
  Actor.monitor(actor)
end

#monitoring?(actor) ⇒ Boolean

Are we monitoring another actor?

Returns:

  • (Boolean)

386
387
388
# File 'lib/celluloid.rb', line 386

def monitoring?(actor)
  Actor.monitoring?(actor)
end

#receive(timeout = nil, &block) ⇒ Object

Receive an asynchronous message via the actor protocol


396
397
398
399
400
401
402
403
# File 'lib/celluloid.rb', line 396

def receive(timeout = nil, &block)
  actor = Thread.current[:celluloid_actor]
  if actor
    actor.receive(timeout, &block)
  else
    Celluloid.mailbox.receive(timeout, &block)
  end
end

#signal(name, value = nil) ⇒ Object

Send a signal with the given name to all waiting methods


336
337
338
# File 'lib/celluloid.rb', line 336

def signal(name, value = nil)
  Thread.current[:celluloid_actor].signal name, value
end

#sleep(interval) ⇒ Object

Sleep letting the actor continue processing messages


406
407
408
409
410
411
412
413
# File 'lib/celluloid.rb', line 406

def sleep(interval)
  actor = Thread.current[:celluloid_actor]
  if actor
    actor.sleep(interval)
  else
    Kernel.sleep interval
  end
end

#tasksObject

Obtain the running tasks for this actor


356
357
358
# File 'lib/celluloid.rb', line 356

def tasks
  Thread.current[:celluloid_actor].tasks.to_a
end

#terminateObject

Terminate this actor


331
332
333
# File 'lib/celluloid.rb', line 331

def terminate
  Thread.current[:celluloid_actor].behavior_proxy.terminate!
end

#timeout(duration) ⇒ Object

Timeout on task suspension (eg Sync calls to other actors)


416
417
418
419
420
# File 'lib/celluloid.rb', line 416

def timeout(duration)
  Thread.current[:celluloid_actor].timeout(duration) do
    yield
  end
end

Remove links to another actor


381
382
383
# File 'lib/celluloid.rb', line 381

def unlink(actor)
  Actor.unlink(actor)
end

#unmonitor(actor) ⇒ Object

Stop waiting for exit events from another actor


371
372
373
# File 'lib/celluloid.rb', line 371

def unmonitor(actor)
  Actor.unmonitor(actor)
end

#wait(name) ⇒ Object

Wait for the given signal


341
342
343
# File 'lib/celluloid.rb', line 341

def wait(name)
  Thread.current[:celluloid_actor].wait name
end