Class: Celluloid::Actor

Inherits:
Object
  • Object
show all
Extended by:
Forwardable
Defined in:
lib/celluloid/actor.rb,
lib/celluloid/actor/system.rb,
lib/celluloid/actor/manager.rb,
lib/celluloid/system_events.rb

Overview

Actors are Celluloid's concurrency primitive. They're implemented as normal Ruby objects wrapped in threads which communicate with asynchronous messages.

Defined Under Namespace

Classes: Manager, Sleeper, System

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(behavior, options) ⇒ Actor

Returns a new instance of Actor


102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
# File 'lib/celluloid/actor.rb', line 102

def initialize(behavior, options)
  @behavior         = behavior

  @actor_system     = options.fetch(:actor_system)
  @mailbox          = options.fetch(:mailbox_class, Mailbox).new
  @mailbox.max_size = options.fetch(:mailbox_size, nil)

  @task_class   = options[:task_class] || Celluloid.task_class
  @exit_handler = method(:default_exit_handler)
  @exclusive    = options.fetch(:exclusive, false)

  @timers    = Timers::Group.new
  @tasks     = Internals::TaskSet.new
  @links     = Internals::Links.new
  @handlers  = Internals::Handlers.new
  @receivers = Internals::Receivers.new(@timers)
  @signals   = Internals::Signals.new
  @running   = false
  @name      = nil

  handle(SystemEvent) do |message|
    handle_system_event message
  end
end

Instance Attribute Details

#behaviorObject (readonly)

Returns the value of attribute behavior


8
9
10
# File 'lib/celluloid/actor.rb', line 8

def behavior
  @behavior
end

#exit_handler=(value) ⇒ Object (writeonly)

Sets the attribute exit_handler

Parameters:

  • value

    the value to set the attribute exit_handler to.


9
10
11
# File 'lib/celluloid/actor.rb', line 9

def exit_handler=(value)
  @exit_handler = value
end

Returns the value of attribute links


8
9
10
# File 'lib/celluloid/actor.rb', line 8

def links
  @links
end

#mailboxObject (readonly)

Returns the value of attribute mailbox


8
9
10
# File 'lib/celluloid/actor.rb', line 8

def mailbox
  @mailbox
end

#nameObject (readonly)

Returns the value of attribute name


8
9
10
# File 'lib/celluloid/actor.rb', line 8

def name
  @name
end

#proxyObject (readonly)

Returns the value of attribute proxy


8
9
10
# File 'lib/celluloid/actor.rb', line 8

def proxy
  @proxy
end

#tasksObject (readonly)

Returns the value of attribute tasks


8
9
10
# File 'lib/celluloid/actor.rb', line 8

def tasks
  @tasks
end

#threadObject (readonly)

Returns the value of attribute thread


8
9
10
# File 'lib/celluloid/actor.rb', line 8

def thread
  @thread
end

#timersObject (readonly)

Returns the value of attribute timers


8
9
10
# File 'lib/celluloid/actor.rb', line 8

def timers
  @timers
end

Class Method Details

.allObject

Obtain all running actors in the system


49
50
51
# File 'lib/celluloid/actor.rb', line 49

def all
  Celluloid.actor_system.running
end

.async(mailbox, meth, *args, &block) ⇒ Object

Invoke a method asynchronously on an actor via its mailbox


37
38
39
40
# File 'lib/celluloid/actor.rb', line 37

def async(mailbox, meth, *args, &block)
  proxy = Proxy::Async.new(mailbox, "UnknownClass")
  proxy.method_missing(meth, *args, &block)
end

.call(mailbox, meth, *args, &block) ⇒ Object

Invoke a method on the given actor via its mailbox


31
32
33
34
# File 'lib/celluloid/actor.rb', line 31

def call(mailbox, meth, *args, &block)
  proxy = Proxy::Sync.new(mailbox, "UnknownClass")
  proxy.method_missing(meth, *args, &block)
end

.currentObject

Obtain the current actor


17
18
19
20
21
# File 'lib/celluloid/actor.rb', line 17

def current
  actor = Thread.current[:celluloid_actor]
  fail NotActorError, "not in actor scope" unless actor
  actor.behavior_proxy
end

.future(mailbox, meth, *args, &block) ⇒ Object

Call a method asynchronously and retrieve its value later


43
44
45
46
# File 'lib/celluloid/actor.rb', line 43

def future(mailbox, meth, *args, &block)
  proxy = Proxy::Future.new(mailbox, "UnknownClass")
  proxy.method_missing(meth, *args, &block)
end

.join(actor, timeout = nil) ⇒ Object

Wait for an actor to terminate


96
97
98
99
# File 'lib/celluloid/actor.rb', line 96

def join(actor, timeout = nil)
  actor.thread.join(timeout)
  actor
end

Link to another actor


66
67
68
69
# File 'lib/celluloid/actor.rb', line 66

def link(actor)
  monitor actor
  Thread.current[:celluloid_actor].links << actor
end

.linked_to?(actor) ⇒ Boolean

Are we bidirectionally linked to the given actor?

Returns:

  • (Boolean)

83
84
85
# File 'lib/celluloid/actor.rb', line 83

def linked_to?(actor)
  monitoring?(actor) && Thread.current[:celluloid_actor].links.include?(actor)
end

.monitor(actor) ⇒ Object

Watch for exit events from another actor


54
55
56
57
# File 'lib/celluloid/actor.rb', line 54

def monitor(actor)
  fail NotActorError, "can't link outside actor context" unless Celluloid.actor?
  Thread.current[:celluloid_actor].linking_request(actor, :link)
end

.monitoring?(actor) ⇒ Boolean

Are we monitoring the given actor?

Returns:

  • (Boolean)

78
79
80
# File 'lib/celluloid/actor.rb', line 78

def monitoring?(actor)
  actor.links.include? Actor.current
end

.registered_nameObject

Obtain the name of the current actor


24
25
26
27
28
# File 'lib/celluloid/actor.rb', line 24

def registered_name
  actor = Thread.current[:celluloid_actor]
  fail NotActorError, "not in actor scope" unless actor
  actor.name
end

Unlink from another actor


72
73
74
75
# File 'lib/celluloid/actor.rb', line 72

def unlink(actor)
  unmonitor actor
  Thread.current[:celluloid_actor].links.delete actor
end

.unmonitor(actor) ⇒ Object

Stop waiting for exit events from another actor


60
61
62
63
# File 'lib/celluloid/actor.rb', line 60

def unmonitor(actor)
  fail NotActorError, "can't link outside actor context" unless Celluloid.actor?
  Thread.current[:celluloid_actor].linking_request(actor, :unlink)
end

Instance Method Details

#after(interval, &block) ⇒ Object

Schedule a block to run at the given time


238
239
240
# File 'lib/celluloid/actor.rb', line 238

def after(interval, &block)
  @timers.after(interval) { task(:timer, &block) }
end

#behavior_proxyObject


139
140
141
# File 'lib/celluloid/actor.rb', line 139

def behavior_proxy
  @behavior.proxy
end

#cleanup(exit_event) ⇒ Object

Clean up after this actor


314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
# File 'lib/celluloid/actor.rb', line 314

def cleanup(exit_event)
  Celluloid::Probe.actor_died(self) if $CELLULOID_MONITORING
  @mailbox.shutdown
  @links.each do |actor|
    actor.mailbox << exit_event if actor.mailbox.alive?
  end

  tasks.to_a.each do |task|
    begin
      task.terminate
    rescue DeadTaskError
      # TODO: not tested (failed on Travis)
    end
  end
rescue => ex
  # TODO: metadata
  Internals::Logger.crash("CLEANUP CRASHED!", ex)
end

#default_exit_handler(event) ⇒ Object


291
292
293
# File 'lib/celluloid/actor.rb', line 291

def default_exit_handler(event)
  fail event.reason if event.reason
end

#every(interval, &block) ⇒ Object

Schedule a block to run at the given time


243
244
245
# File 'lib/celluloid/actor.rb', line 243

def every(interval, &block)
  @timers.every(interval) { task(:timer, &block) }
end

#handle(*patterns, &block) ⇒ Object

Register a new handler for a given pattern


223
224
225
# File 'lib/celluloid/actor.rb', line 223

def handle(*patterns, &block)
  @handlers.handle(*patterns, &block)
end

#handle_crash(exception) ⇒ Object

Handle any exceptions that occur within a running actor


296
297
298
299
300
301
302
# File 'lib/celluloid/actor.rb', line 296

def handle_crash(exception)
  # TODO: add meta info
  Internals::Logger.crash("Actor crashed!", exception)
  shutdown ExitEvent.new(behavior_proxy, exception)
rescue => ex
  Internals::Logger.crash("Actor#handle_crash CRASHED!", ex)
end

#handle_message(message) ⇒ Object

Handle standard low-priority messages


282
283
284
285
286
287
288
289
# File 'lib/celluloid/actor.rb', line 282

def handle_message(message)
  unless @handlers.handle_message(message)
    unless @receivers.handle_message(message)
      Internals::Logger.debug "Discarded message (unhandled): #{message}" if $CELLULOID_DEBUG
    end
  end
  message
end

#handle_system_event(event) ⇒ Object

Handle high-priority system event messages


4
5
6
7
8
9
10
# File 'lib/celluloid/system_events.rb', line 4

def handle_system_event(event)
  if handler = SystemEvent.handle(event.class)
    send(handler, event)
  else
    Internals::Logger.debug "Discarded message (unhandled): #{message}" if $CELLULOID_DEBUG
  end
end

#kill(actor) ⇒ Object

Forcibly kill a given actor


89
90
91
92
# File 'lib/celluloid/actor.rb', line 89

def kill(actor)
  actor.thread.kill
  actor.mailbox.shutdown if actor.mailbox.alive?
end

#linking_request(receiver, type) ⇒ Object

Perform a linking request with another actor


181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
# File 'lib/celluloid/actor.rb', line 181

def linking_request(receiver, type)
  Celluloid.exclusive do
    receiver.mailbox << LinkingRequest.new(Actor.current, type)
    system_events = []

    Timers::Wait.for(LINKING_TIMEOUT) do |remaining|
      begin
        message = @mailbox.receive(remaining) do |msg|
          msg.is_a?(LinkingResponse) &&
          msg.actor.mailbox.address == receiver.mailbox.address &&
          msg.type == type
        end
      rescue TaskTimeout
        next # IO reactor did something, no message in queue yet.
      end

      if message.instance_of? LinkingResponse
        Celluloid::Probe.actors_linked(self, receiver) if $CELLULOID_MONITORING
        system_events.each { |ev| @mailbox << ev }
        return
      elsif message.is_a? SystemEvent
        # Queue up pending system events to be processed after we've successfully linked
        system_events << message
      else fail "Unexpected message type: #{message.class}. Expected LinkingResponse, NilClass, SystemEvent."
      end
    end

    fail TaskTimeout, "linking timeout of #{LINKING_TIMEOUT} seconds exceeded with receiver: #{receiver}"
  end
end

#receive(timeout = nil, &block) ⇒ Object

Receive an asynchronous message


228
229
230
231
232
233
234
235
# File 'lib/celluloid/actor.rb', line 228

def receive(timeout = nil, &block)
  while true
    message = @receivers.receive(timeout, &block)
    return message unless message.is_a?(SystemEvent)

    handle_system_event(message)
  end
end

#runObject

Run the actor loop


149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
# File 'lib/celluloid/actor.rb', line 149

def run
  while @running
    begin
      @timers.wait do |interval|
        interval = 0 if interval && interval < 0

        if message = @mailbox.check(interval)
          handle_message(message)

          break unless @running
        end
      end
    rescue MailboxShutdown
      @running = false
    rescue MailboxDead
      # TODO: not tests (but fails occasionally in tests)
      @running = false
    end
  end

  shutdown
rescue ::Exception => ex
  handle_crash(ex)
  raise unless ex.is_a?(StandardError) || ex.is_a?(Celluloid::Interruption)
end

#setup_threadObject


143
144
145
146
# File 'lib/celluloid/actor.rb', line 143

def setup_thread
  Thread.current[:celluloid_actor]   = self
  Thread.current[:celluloid_mailbox] = @mailbox
end

#shutdown(exit_event = ExitEvent.new(behavior_proxy)) ⇒ Object

Handle cleaning up this actor after it exits


305
306
307
308
309
310
311
# File 'lib/celluloid/actor.rb', line 305

def shutdown(exit_event = ExitEvent.new(behavior_proxy))
  @behavior.shutdown
  cleanup exit_event
ensure
  Thread.current[:celluloid_actor]   = nil
  Thread.current[:celluloid_mailbox] = nil
end

#signal(name, value = nil) ⇒ Object

Send a signal with the given name to all waiting methods


213
214
215
# File 'lib/celluloid/actor.rb', line 213

def signal(name, value = nil)
  @signals.broadcast name, value
end

#sleep(interval) ⇒ Object

Sleep for the given amount of time


276
277
278
279
# File 'lib/celluloid/actor.rb', line 276

def sleep(interval)
  sleeper = Sleeper.new(@timers, interval)
  Celluloid.suspend(:sleeping, sleeper)
end

#startObject


127
128
129
130
131
132
133
134
135
136
137
# File 'lib/celluloid/actor.rb', line 127

def start
  @running = true
  @thread = Internals::ThreadHandle.new(@actor_system, :actor) do
    setup_thread
    run
  end

  @proxy = Proxy::Actor.new(@mailbox, @thread)
  Celluloid::Probe.actor_created(self) if $CELLULOID_MONITORING
  Celluloid::Actor::Manager.actor_created(self) if $CELLULOID_MANAGED
end

#task(task_type, meta = nil) ⇒ Object

Run a method inside a task unless it's exclusive


334
335
336
337
338
339
340
341
342
# File 'lib/celluloid/actor.rb', line 334

def task(task_type, meta = nil)
  @task_class.new(task_type, meta) do
    if @exclusive
      Celluloid.exclusive { yield }
    else
      yield
    end
  end.resume
end

#terminateObject

Terminate this actor


176
177
178
# File 'lib/celluloid/actor.rb', line 176

def terminate
  @running = false
end

#timeout(duration) ⇒ Object


247
248
249
250
251
252
253
254
255
256
257
258
# File 'lib/celluloid/actor.rb', line 247

def timeout(duration)
  bt = caller
  task = Task.current
  timer = @timers.after(duration) do
    exception = TaskTimeout.new("execution expired")
    exception.set_backtrace bt
    task.resume exception
  end
  yield
ensure
  timer.cancel if timer
end

#wait(name) ⇒ Object

Wait for the given signal


218
219
220
# File 'lib/celluloid/actor.rb', line 218

def wait(name)
  @signals.wait name
end