Class: Celluloid::Actor

Inherits:
Object
  • Object
show all
Extended by:
Forwardable
Defined in:
lib/celluloid/actor.rb,
lib/celluloid/actor/system.rb,
lib/celluloid/system_events.rb

Overview

Actors are Celluloid’s concurrency primitive. They’re implemented as normal Ruby objects wrapped in threads which communicate with asynchronous messages.

Defined Under Namespace

Classes: Sleeper, System

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(behavior, options) ⇒ Actor

Returns a new instance of Actor.



102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
# File 'lib/celluloid/actor.rb', line 102

def initialize(behavior, options)
  @behavior         = behavior

  @actor_system     = options.fetch(:actor_system)
  @mailbox          = options.fetch(:mailbox_class, Mailbox).new
  @mailbox.max_size = options.fetch(:mailbox_size, nil)

  @task_class   = options[:task_class] || Celluloid.task_class
  @exit_handler = method(:default_exit_handler)
  @exclusive    = options.fetch(:exclusive, false)

  @timers    = Timers::Group.new
  @tasks     = Internals::TaskSet.new
  @links     = Internals::Links.new
  @handlers  = Internals::Handlers.new
  @receivers = Internals::Receivers.new(@timers)
  @signals   = Internals::Signals.new
  @running   = false
  @name      = nil

  handle(SystemEvent) do |message|
    handle_system_event message
  end
end

Instance Attribute Details

#behaviorObject (readonly)

Returns the value of attribute behavior.



8
9
10
# File 'lib/celluloid/actor.rb', line 8

def behavior
  @behavior
end

#exit_handler=(value) ⇒ Object (writeonly)

Sets the attribute exit_handler

Parameters:

  • value

    the value to set the attribute exit_handler to.



9
10
11
# File 'lib/celluloid/actor.rb', line 9

def exit_handler=(value)
  @exit_handler = value
end

Returns the value of attribute links.



8
9
10
# File 'lib/celluloid/actor.rb', line 8

def links
  @links
end

#mailboxObject (readonly)

Returns the value of attribute mailbox.



8
9
10
# File 'lib/celluloid/actor.rb', line 8

def mailbox
  @mailbox
end

#nameObject (readonly)

Returns the value of attribute name.



8
9
10
# File 'lib/celluloid/actor.rb', line 8

def name
  @name
end

#proxyObject (readonly)

Returns the value of attribute proxy.



8
9
10
# File 'lib/celluloid/actor.rb', line 8

def proxy
  @proxy
end

#tasksObject (readonly)

Returns the value of attribute tasks.



8
9
10
# File 'lib/celluloid/actor.rb', line 8

def tasks
  @tasks
end

#threadObject (readonly)

Returns the value of attribute thread.



8
9
10
# File 'lib/celluloid/actor.rb', line 8

def thread
  @thread
end

#timersObject (readonly)

Returns the value of attribute timers.



8
9
10
# File 'lib/celluloid/actor.rb', line 8

def timers
  @timers
end

Class Method Details

.allObject

Obtain all running actors in the system



49
50
51
# File 'lib/celluloid/actor.rb', line 49

def all
  Celluloid.actor_system.running
end

.async(mailbox, meth, *args, &block) ⇒ Object

Invoke a method asynchronously on an actor via its mailbox



37
38
39
40
# File 'lib/celluloid/actor.rb', line 37

def async(mailbox, meth, *args, &block)
  proxy = Proxy::Async.new(mailbox, "UnknownClass")
  proxy.method_missing(meth, *args, &block)
end

.call(mailbox, meth, *args, &block) ⇒ Object

Invoke a method on the given actor via its mailbox



31
32
33
34
# File 'lib/celluloid/actor.rb', line 31

def call(mailbox, meth, *args, &block)
  proxy = Proxy::Sync.new(mailbox, "UnknownClass")
  proxy.method_missing(meth, *args, &block)
end

.currentObject

Obtain the current actor

Raises:



17
18
19
20
21
# File 'lib/celluloid/actor.rb', line 17

def current
  actor = Thread.current[:celluloid_actor]
  raise NotActorError, "not in actor scope" unless actor
  actor.behavior_proxy
end

.future(mailbox, meth, *args, &block) ⇒ Object

Call a method asynchronously and retrieve its value later



43
44
45
46
# File 'lib/celluloid/actor.rb', line 43

def future(mailbox, meth, *args, &block)
  proxy = Proxy::Future.new(mailbox, "UnknownClass")
  proxy.method_missing(meth, *args, &block)
end

.join(actor, timeout = nil) ⇒ Object

Wait for an actor to terminate



96
97
98
99
# File 'lib/celluloid/actor.rb', line 96

def join(actor, timeout = nil)
  actor.thread.join(timeout)
  actor
end

Link to another actor



66
67
68
69
# File 'lib/celluloid/actor.rb', line 66

def link(actor)
  monitor actor
  Thread.current[:celluloid_actor].links << actor
end

.linked_to?(actor) ⇒ Boolean

Are we bidirectionally linked to the given actor?

Returns:

  • (Boolean)


83
84
85
# File 'lib/celluloid/actor.rb', line 83

def linked_to?(actor)
  monitoring?(actor) && Thread.current[:celluloid_actor].links.include?(actor)
end

.monitor(actor) ⇒ Object

Watch for exit events from another actor

Raises:



54
55
56
57
# File 'lib/celluloid/actor.rb', line 54

def monitor(actor)
  raise NotActorError, "can't link outside actor context" unless Celluloid.actor?
  Thread.current[:celluloid_actor].linking_request(actor, :link)
end

.monitoring?(actor) ⇒ Boolean

Are we monitoring the given actor?

Returns:

  • (Boolean)


78
79
80
# File 'lib/celluloid/actor.rb', line 78

def monitoring?(actor)
  actor.links.include? Actor.current
end

.registered_nameObject

Obtain the name of the current actor

Raises:



24
25
26
27
28
# File 'lib/celluloid/actor.rb', line 24

def registered_name
  actor = Thread.current[:celluloid_actor]
  raise NotActorError, "not in actor scope" unless actor
  actor.name
end

Unlink from another actor



72
73
74
75
# File 'lib/celluloid/actor.rb', line 72

def unlink(actor)
  unmonitor actor
  Thread.current[:celluloid_actor].links.delete actor
end

.unmonitor(actor) ⇒ Object

Stop waiting for exit events from another actor

Raises:



60
61
62
63
# File 'lib/celluloid/actor.rb', line 60

def unmonitor(actor)
  raise NotActorError, "can't link outside actor context" unless Celluloid.actor?
  Thread.current[:celluloid_actor].linking_request(actor, :unlink)
end

Instance Method Details

#after(interval, &block) ⇒ Object

Schedule a block to run at the given time



244
245
246
# File 'lib/celluloid/actor.rb', line 244

def after(interval, &block)
  @timers.after(interval) { task(:timer, &block) }
end

#behavior_proxyObject



142
143
144
# File 'lib/celluloid/actor.rb', line 142

def behavior_proxy
  @behavior.proxy
end

#cleanup(exit_event) ⇒ Object

Clean up after this actor



320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
# File 'lib/celluloid/actor.rb', line 320

def cleanup(exit_event)
  # !!! DO NOT INTRODUCE ADDITIONAL GLOBAL VARIABLES !!!
  # rubocop:disable Style/GlobalVars
  Celluloid::Probe.actor_died(self) if $CELLULOID_MONITORING
  # rubocop:enable Style/GlobalVars

  @mailbox.shutdown
  @links.each do |actor|
    actor.mailbox << exit_event if actor.mailbox.alive?
  end

  tasks.to_a.each do |task|
    begin
      task.terminate
    rescue DeadTaskError
      # TODO: not tested (failed on Travis)
    end
  end
rescue => ex
  # TODO: metadata
  Internals::Logger.crash("CLEANUP CRASHED!", ex)
end

#default_exit_handler(event) ⇒ Object



297
298
299
# File 'lib/celluloid/actor.rb', line 297

def default_exit_handler(event)
  raise event.reason if event.reason
end

#every(interval, &block) ⇒ Object

Schedule a block to run at the given time



249
250
251
# File 'lib/celluloid/actor.rb', line 249

def every(interval, &block)
  @timers.every(interval) { task(:timer, &block) }
end

#handle(*patterns, &block) ⇒ Object

Register a new handler for a given pattern



229
230
231
# File 'lib/celluloid/actor.rb', line 229

def handle(*patterns, &block)
  @handlers.handle(*patterns, &block)
end

#handle_crash(exception) ⇒ Object

Handle any exceptions that occur within a running actor



302
303
304
305
306
307
308
# File 'lib/celluloid/actor.rb', line 302

def handle_crash(exception)
  # TODO: add meta info
  Internals::Logger.crash("Actor crashed!", exception)
  shutdown ExitEvent.new(behavior_proxy, exception)
rescue => ex
  Internals::Logger.crash("Actor#handle_crash CRASHED!", ex)
end

#handle_message(message) ⇒ Object

Handle standard low-priority messages



288
289
290
291
292
293
294
295
# File 'lib/celluloid/actor.rb', line 288

def handle_message(message)
  # !!! DO NOT INTRODUCE ADDITIONAL GLOBAL VARIABLES !!!
  # rubocop:disable Metrics/LineLength, Style/GlobalVars
  Internals::Logger.debug "Discarded message (unhandled): #{message}" if !@handlers.handle_message(message) && !@receivers.handle_message(message) && $CELLULOID_DEBUG
  # rubocop:enable Metrics/LineLength, Style/GlobalVars

  message
end

#handle_system_event(event) ⇒ Object

Handle high-priority system event messages



4
5
6
7
8
9
10
11
12
13
# File 'lib/celluloid/system_events.rb', line 4

def handle_system_event(event)
  if handler = SystemEvent.handle(event.class)
    send(handler, event)
  else
    # !!! DO NOT INTRODUCE ADDITIONAL GLOBAL VARIABLES !!!
    # rubocop:disable Style/GlobalVars
    Internals::Logger.debug "Discarded message (unhandled): #{message}" if $CELLULOID_DEBUG
    # rubocop:enable Style/GlobalVars
  end
end

#kill(actor) ⇒ Object

Forcibly kill a given actor



89
90
91
92
# File 'lib/celluloid/actor.rb', line 89

def kill(actor)
  actor.thread.kill
  actor.mailbox.shutdown if actor.mailbox.alive?
end

#linking_request(receiver, type) ⇒ Object

Perform a linking request with another actor



184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
# File 'lib/celluloid/actor.rb', line 184

def linking_request(receiver, type)
  Celluloid.exclusive do
    receiver.mailbox << LinkingRequest.new(Actor.current, type)
    system_events = []

    Timers::Wait.for(LINKING_TIMEOUT) do |remaining|
      begin
        message = @mailbox.receive(remaining) do |msg|
          msg.is_a?(LinkingResponse) &&
            msg.actor.mailbox.address == receiver.mailbox.address &&
            msg.type == type
        end
      rescue TaskTimeout
        next # IO reactor did something, no message in queue yet.
      end

      if message.instance_of? LinkingResponse
        # !!! DO NOT INTRODUCE ADDITIONAL GLOBAL VARIABLES !!!
        # rubocop:disable Style/GlobalVars
        Celluloid::Probe.actors_linked(self, receiver) if $CELLULOID_MONITORING
        # rubocop:enable Style/GlobalVars
        system_events.each { |ev| @mailbox << ev }
        return
      elsif message.is_a? SystemEvent
        # Queue up pending system events to be processed after we've successfully linked
        system_events << message
      else raise "Unexpected message type: #{message.class}. Expected LinkingResponse, NilClass, SystemEvent."
      end
    end

    raise TaskTimeout, "linking timeout of #{LINKING_TIMEOUT} seconds exceeded with receiver: #{receiver}"
  end
end

#receive(timeout = nil, &block) ⇒ Object

Receive an asynchronous message



234
235
236
237
238
239
240
241
# File 'lib/celluloid/actor.rb', line 234

def receive(timeout = nil, &block)
  loop do
    message = @receivers.receive(timeout, &block)
    return message unless message.is_a?(SystemEvent)

    handle_system_event(message)
  end
end

#runObject

Run the actor loop



152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
# File 'lib/celluloid/actor.rb', line 152

def run
  while @running
    begin
      @timers.wait do |interval|
        interval = 0 if interval && interval < 0

        if message = @mailbox.check(interval)
          handle_message(message)

          break unless @running
        end
      end
    rescue MailboxShutdown
      @running = false
    rescue MailboxDead
      # TODO: not tests (but fails occasionally in tests)
      @running = false
    end
  end

  shutdown
rescue ::Exception => ex
  handle_crash(ex)
  raise unless ex.is_a?(StandardError) || ex.is_a?(Celluloid::Interruption)
end

#setup_threadObject



146
147
148
149
# File 'lib/celluloid/actor.rb', line 146

def setup_thread
  Thread.current[:celluloid_actor]   = self
  Thread.current[:celluloid_mailbox] = @mailbox
end

#shutdown(exit_event = ExitEvent.new(behavior_proxy)) ⇒ Object

Handle cleaning up this actor after it exits



311
312
313
314
315
316
317
# File 'lib/celluloid/actor.rb', line 311

def shutdown(exit_event = ExitEvent.new(behavior_proxy))
  @behavior.shutdown
  cleanup exit_event
ensure
  Thread.current[:celluloid_actor]   = nil
  Thread.current[:celluloid_mailbox] = nil
end

#signal(name, value = nil) ⇒ Object

Send a signal with the given name to all waiting methods



219
220
221
# File 'lib/celluloid/actor.rb', line 219

def signal(name, value = nil)
  @signals.broadcast name, value
end

#sleep(interval) ⇒ Object

Sleep for the given amount of time



282
283
284
285
# File 'lib/celluloid/actor.rb', line 282

def sleep(interval)
  sleeper = Sleeper.new(@timers, interval)
  Celluloid.suspend(:sleeping, sleeper)
end

#startObject



127
128
129
130
131
132
133
134
135
136
137
138
139
140
# File 'lib/celluloid/actor.rb', line 127

def start
  @running = true
  @thread = Internals::ThreadHandle.new(@actor_system, :actor) do
    setup_thread
    run
  end

  @proxy = Proxy::Actor.new(@mailbox, @thread)

  # !!! DO NOT INTRODUCE ADDITIONAL GLOBAL VARIABLES !!!
  # rubocop:disable Style/GlobalVars
  Celluloid::Probe.actor_created(self) if $CELLULOID_MONITORING
  # rubocop:enable Style/GlobalVars
end

#task(task_type, meta = nil) ⇒ Object

Run a method inside a task unless it’s exclusive



344
345
346
347
348
349
350
351
352
# File 'lib/celluloid/actor.rb', line 344

def task(task_type, meta = nil)
  @task_class.new(task_type, meta) do
    if @exclusive
      Celluloid.exclusive { yield }
    else
      yield
    end
  end.resume
end

#terminateObject

Terminate this actor



179
180
181
# File 'lib/celluloid/actor.rb', line 179

def terminate
  @running = false
end

#timeout(duration) ⇒ Object



253
254
255
256
257
258
259
260
261
262
263
264
# File 'lib/celluloid/actor.rb', line 253

def timeout(duration)
  bt = caller
  task = Task.current
  timer = @timers.after(duration) do
    exception = TaskTimeout.new("execution expired")
    exception.set_backtrace bt
    task.resume exception
  end
  yield
ensure
  timer.cancel if timer
end

#wait(name) ⇒ Object

Wait for the given signal



224
225
226
# File 'lib/celluloid/actor.rb', line 224

def wait(name)
  @signals.wait name
end