Module: Celluloid

Extended by:
Celluloid
Included in:
Celluloid, IncidentReporter, Notifications::Fanout, PoolManager, SupervisionGroup
Defined in:
lib/celluloid.rb,
lib/celluloid/fsm.rb,
lib/celluloid/uuid.rb,
lib/celluloid/tasks.rb,
lib/celluloid/rspec.rb,
lib/celluloid/links.rb,
lib/celluloid/calls.rb,
lib/celluloid/actor.rb,
lib/celluloid/future.rb,
lib/celluloid/method.rb,
lib/celluloid/logger.rb,
lib/celluloid/thread.rb,
lib/celluloid/mailbox.rb,
lib/celluloid/signals.rb,
lib/celluloid/registry.rb,
lib/celluloid/task_set.rb,
lib/celluloid/responses.rb,
lib/celluloid/condition.rb,
lib/celluloid/receivers.rb,
lib/celluloid/properties.rb,
lib/celluloid/stack_dump.rb,
lib/celluloid/supervisor.rb,
lib/celluloid/call_chain.rb,
lib/celluloid/cpu_counter.rb,
lib/celluloid/pool_manager.rb,
lib/celluloid/thread_handle.rb,
lib/celluloid/internal_pool.rb,
lib/celluloid/notifications.rb,
lib/celluloid/system_events.rb,
lib/celluloid/evented_mailbox.rb,
lib/celluloid/logging/incident.rb,
lib/celluloid/tasks/task_fiber.rb,
lib/celluloid/logging/log_event.rb,
lib/celluloid/tasks/task_thread.rb,
lib/celluloid/supervision_group.rb,
lib/celluloid/proxies/sync_proxy.rb,
lib/celluloid/proxies/actor_proxy.rb,
lib/celluloid/proxies/block_proxy.rb,
lib/celluloid/proxies/async_proxy.rb,
lib/celluloid/logging/ring_buffer.rb,
lib/celluloid/proxies/future_proxy.rb,
lib/celluloid/proxies/abstract_proxy.rb,
lib/celluloid/logging/incident_logger.rb,
lib/celluloid/logging/incident_reporter.rb

Defined Under Namespace

Modules: CPUCounter, ClassMethods, FSM, InstanceMethods, Logger, Notifications, Properties, UUID Classes: AbortError, AbstractProxy, Actor, ActorProxy, AsyncCall, AsyncProxy, BlockCall, BlockProxy, BlockResponse, Call, CallChain, Condition, ConditionError, DeadActorError, DeadTaskError, ErrorResponse, EventedMailbox, ExitEvent, FiberStackError, Future, FutureProxy, Incident, IncidentLogger, IncidentReporter, InternalPool, LinkingRequest, LinkingResponse, Links, LogEvent, Mailbox, MailboxDead, MailboxShutdown, Method, NamingRequest, NotActorError, NotTaskError, PoolManager, Receiver, Receivers, Registry, Response, ResumableError, RingBuffer, SignalConditionRequest, Signals, StackDump, SuccessResponse, SupervisionGroup, Supervisor, SyncCall, SyncProxy, SystemEvent, Task, TaskFiber, TaskSet, TaskThread, TerminationRequest, Thread, ThreadHandle, TimeoutError

Constant Summary

VERSION =
'0.15.2'
Error =
Class.new StandardError
BARE_OBJECT_WARNING_MESSAGE =

Warning message added to Celluloid objects accessed outside their actors

"WARNING: BARE CELLULOID OBJECT "
TIMER_QUANTUM =

Timer accuracy enforced by the tests (50ms)

0.05
LINKING_TIMEOUT =

linking times out after 5 seconds

5
OWNER_IVAR =

reference to owning actor

:@celluloid_owner

Class Attribute Summary (collapse)

Class Method Summary (collapse)

Instance Method Summary (collapse)

Class Attribute Details

+ (Object) internal_pool

Returns the value of attribute internal_pool



20
21
22
# File 'lib/celluloid.rb', line 20

def internal_pool
  @internal_pool
end

+ (Object) logger

Returns the value of attribute logger



21
22
23
# File 'lib/celluloid.rb', line 21

def logger
  @logger
end

+ (Object) shutdown_timeout

Internal thread pool Thread-safe logger class Default task type to use How long actors have to terminate



23
24
25
# File 'lib/celluloid.rb', line 23

def shutdown_timeout
  @shutdown_timeout
end

+ (Object) task_class

Returns the value of attribute task_class



22
23
24
# File 'lib/celluloid.rb', line 22

def task_class
  @task_class
end

Class Method Details

+ (Boolean) actor?

Are we currently inside of an actor?

Returns:

  • (Boolean)


49
50
51
# File 'lib/celluloid.rb', line 49

def actor?
  !!Thread.current[:celluloid_actor]
end

+ (Object) boot



103
104
105
106
# File 'lib/celluloid.rb', line 103

def boot
  init
  start
end

+ (Object) cores Also known as: cpus, ncpus

Obtain the number of CPUs in the system



64
65
66
# File 'lib/celluloid.rb', line 64

def cores
 CPUCounter.cores
end

+ (Object) detect_recursion

Detect if a particular call is recursing through multiple actors



77
78
79
80
81
82
83
84
85
86
# File 'lib/celluloid.rb', line 77

def detect_recursion
  actor = Thread.current[:celluloid_actor]
  return unless actor

  task = Thread.current[:celluloid_task]
  return unless task

  chain_id = CallChain.current_id
  actor.tasks.to_a.any? { |t| t != task && t.chain_id == chain_id }
end

+ (Object) exception_handler(&block)

Define an exception handler for actor crashes



89
90
91
# File 'lib/celluloid.rb', line 89

def exception_handler(&block)
  Logger.exception_handler(&block)
end

+ (Object) included(klass)



25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
# File 'lib/celluloid.rb', line 25

def included(klass)
  klass.send :extend,  ClassMethods
  klass.send :include, InstanceMethods

  klass.send :extend, Properties

  klass.property :mailbox_class, :default => Celluloid::Mailbox
  klass.property :proxy_class,   :default => Celluloid::ActorProxy
  klass.property :task_class,    :default => Celluloid.task_class
  klass.property :mailbox_size

  klass.property :execute_block_on_receiver,
    :default => [:after, :every, :receive],
    :multi   => true

  klass.property :finalizer
  klass.property :exit_handler

  klass.send(:define_singleton_method, :trap_exit) do |*args|
    exit_handler(*args)
  end
end

+ (Object) init



108
109
110
# File 'lib/celluloid.rb', line 108

def init
  self.internal_pool = InternalPool.new
end

+ (Object) mailbox

Retrieve the mailbox for the current thread or lazily initialize it



54
55
56
# File 'lib/celluloid.rb', line 54

def mailbox
  Thread.current[:celluloid_mailbox] ||= Celluloid::Mailbox.new
end

+ (Object) register_shutdown



123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
# File 'lib/celluloid.rb', line 123

def register_shutdown
  return if @shutdown_registered
  # Terminate all actors at exit
  at_exit do
    if defined?(RUBY_ENGINE) && RUBY_ENGINE == "ruby" && RUBY_VERSION >= "1.9"
      # workaround for MRI bug losing exit status in at_exit block
      # http://bugs.ruby-lang.org/issues/5218
      exit_status = $!.status if $!.is_a?(SystemExit)
      Celluloid.shutdown
      exit exit_status if exit_status
    else
      Celluloid.shutdown
    end
  end
  @shutdown_registered = true
end

+ (Boolean) running?

Returns:

  • (Boolean)


119
120
121
# File 'lib/celluloid.rb', line 119

def running?
  internal_pool
end

+ (Object) shutdown

Shut down all running actors



141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
# File 'lib/celluloid.rb', line 141

def shutdown
  actors = Actor.all

  Timeout.timeout(shutdown_timeout) do
    internal_pool.shutdown

    Logger.debug "Terminating #{actors.size} #{(actors.size > 1) ? 'actors' : 'actor'}..." if actors.size > 0

    # Attempt to shut down the supervision tree, if available
    Supervisor.root.terminate if Supervisor.root

    # Actors cannot self-terminate, you must do it for them
    actors.each do |actor|
      begin
        actor.terminate!
      rescue DeadActorError
      end
    end

    actors.each do |actor|
      begin
        Actor.join(actor)
      rescue DeadActorError
      end
    end
  end
rescue Timeout::Error
  Logger.error("Couldn't cleanly terminate all actors in #{shutdown_timeout} seconds!")
  actors.each do |actor|
    begin
      Actor.kill(actor)
    rescue DeadActorError, MailboxDead
    end
  end
ensure
  internal_pool.kill
end

+ (Object) stack_dump(output = STDERR) Also known as: dump

Perform a stack dump of all actors to the given output object



71
72
73
# File 'lib/celluloid.rb', line 71

def stack_dump(output = STDERR)
  Celluloid::StackDump.new.dump(output)
end

+ (Object) start

Launch default services FIXME: We should set up the supervision hierarchy here



114
115
116
117
# File 'lib/celluloid.rb', line 114

def start
  Celluloid::Notifications::Fanout.supervise_as :notifications_fanout
  Celluloid::IncidentReporter.supervise_as :default_incident_reporter, STDERR
end

+ (Object) suspend(status, waiter)



93
94
95
96
97
98
99
100
101
# File 'lib/celluloid.rb', line 93

def suspend(status, waiter)
  task = Thread.current[:celluloid_task]
  if task && !Celluloid.exclusive?
    waiter.before_suspend(task) if waiter.respond_to?(:before_suspend)
    Task.suspend(status)
  else
    waiter.wait
  end
end

+ (Object) uuid

Generate a Universally Unique Identifier



59
60
61
# File 'lib/celluloid.rb', line 59

def uuid
  UUID.generate
end

+ (Object) version



179
180
181
# File 'lib/celluloid.rb', line 179

def version
  VERSION
end

Instance Method Details

- (Object) abort(cause)

Raise an exception in sender context, but stay running

Raises:



327
328
329
330
331
332
333
334
# File 'lib/celluloid.rb', line 327

def abort(cause)
  cause = case cause
    when String then RuntimeError.new(cause)
    when Exception then cause
    else raise TypeError, "Exception object/String expected, but #{cause.class} received"
  end
  raise AbortError.new(cause)
end

- (Object) after(interval, &block)

Call a block after a given interval, returning a Celluloid::Timer object



441
442
443
# File 'lib/celluloid.rb', line 441

def after(interval, &block)
  Thread.current[:celluloid_actor].after(interval, &block)
end

- (Object) async(meth = nil, *args, &block)

Handle async calls within an actor itself



460
461
462
# File 'lib/celluloid.rb', line 460

def async(meth = nil, *args, &block)
  Thread.current[:celluloid_actor].proxy.async meth, *args, &block
end

- (Object) call_chain_id

Obtain the UUID of the current call chain



357
358
359
# File 'lib/celluloid.rb', line 357

def call_chain_id
  CallChain.current_id
end

- (Object) current_actor

Obtain the current_actor



352
353
354
# File 'lib/celluloid.rb', line 352

def current_actor
  Actor.current
end

- (Object) defer(&block)

Perform a blocking or computationally intensive action inside an asynchronous thread pool, allowing the sender to continue processing other messages in its mailbox in the meantime



453
454
455
456
457
# File 'lib/celluloid.rb', line 453

def defer(&block)
  # This implementation relies on the present implementation of
  # Celluloid::Future, which uses a thread from InternalPool to run the block
  Future.new(&block).value
end

- (Object) every(interval, &block)

Call a block every given interval, returning a Celluloid::Timer object



446
447
448
# File 'lib/celluloid.rb', line 446

def every(interval, &block)
  Thread.current[:celluloid_actor].every(interval, &block)
end

- (Object) exclusive(&block)

Run given block in an exclusive mode: all synchronous calls block the whole actor, not only current message processing.



430
431
432
# File 'lib/celluloid.rb', line 430

def exclusive(&block)
  Thread.current[:celluloid_task].exclusive(&block)
end

- (Boolean) exclusive?

Are we currently exclusive

Returns:

  • (Boolean)


435
436
437
438
# File 'lib/celluloid.rb', line 435

def exclusive?
  task = Thread.current[:celluloid_task]
  task && task.exclusive?
end

- (Object) future(meth = nil, *args, &block)

Handle calls to future within an actor itself



465
466
467
# File 'lib/celluloid.rb', line 465

def future(meth = nil, *args, &block)
  Thread.current[:celluloid_actor].proxy.future meth, *args, &block
end

Link this actor to another, allowing it to crash or react to errors



382
383
384
# File 'lib/celluloid.rb', line 382

def link(actor)
  Actor.link(actor)
end

- (Boolean) linked_to?(actor)

Is this actor linked to another?

Returns:

  • (Boolean)


397
398
399
# File 'lib/celluloid.rb', line 397

def linked_to?(actor)
  Actor.linked_to?(actor)
end

Obtain the Celluloid::Links for this actor



367
368
369
# File 'lib/celluloid.rb', line 367

def links
  Thread.current[:celluloid_actor].links
end

- (Object) monitor(actor)

Watch for exit events from another actor



372
373
374
# File 'lib/celluloid.rb', line 372

def monitor(actor)
  Actor.monitor(actor)
end

- (Boolean) monitoring?(actor)

Are we monitoring another actor?

Returns:

  • (Boolean)


392
393
394
# File 'lib/celluloid.rb', line 392

def monitoring?(actor)
  Actor.monitoring?(actor)
end

- (Object) receive(timeout = nil, &block)

Receive an asynchronous message via the actor protocol



402
403
404
405
406
407
408
409
# File 'lib/celluloid.rb', line 402

def receive(timeout = nil, &block)
  actor = Thread.current[:celluloid_actor]
  if actor
    actor.receive(timeout, &block)
  else
    Celluloid.mailbox.receive(timeout, &block)
  end
end

- (Object) signal(name, value = nil)

Send a signal with the given name to all waiting methods



342
343
344
# File 'lib/celluloid.rb', line 342

def signal(name, value = nil)
  Thread.current[:celluloid_actor].signal name, value
end

- (Object) sleep(interval)

Sleep letting the actor continue processing messages



412
413
414
415
416
417
418
419
# File 'lib/celluloid.rb', line 412

def sleep(interval)
  actor = Thread.current[:celluloid_actor]
  if actor
    actor.sleep(interval)
  else
    Kernel.sleep interval
  end
end

- (Object) tasks

Obtain the running tasks for this actor



362
363
364
# File 'lib/celluloid.rb', line 362

def tasks
  Thread.current[:celluloid_actor].tasks.to_a
end

- (Object) terminate

Terminate this actor



337
338
339
# File 'lib/celluloid.rb', line 337

def terminate
  Thread.current[:celluloid_actor].proxy.terminate!
end

- (Object) timeout(duration)

Timeout on task suspension (eg Sync calls to other actors)



422
423
424
425
426
# File 'lib/celluloid.rb', line 422

def timeout(duration)
  Thread.current[:celluloid_actor].timeout(duration) do
    yield
  end
end

Remove links to another actor



387
388
389
# File 'lib/celluloid.rb', line 387

def unlink(actor)
  Actor.unlink(actor)
end

- (Object) unmonitor(actor)

Stop waiting for exit events from another actor



377
378
379
# File 'lib/celluloid.rb', line 377

def unmonitor(actor)
  Actor.unmonitor(actor)
end

- (Object) wait(name)

Wait for the given signal



347
348
349
# File 'lib/celluloid.rb', line 347

def wait(name)
  Thread.current[:celluloid_actor].wait name
end