Class: Concurrent::Actor::Core

Inherits:
Synchronization::LockableObject
  • Object
show all
Includes:
Concern::Logging, TypeCheck
Defined in:
lib/concurrent-ruby-edge/concurrent/actor/core.rb

Overview

Note:

Whole class should be considered private. An user should use Contexts and References only.

Note:

devel: core should not block on anything, e.g. it cannot wait on children to terminate that would eat up all threads in task pool and deadlock

Core of the actor.

Instance Attribute Summary collapse

Instance Method Summary collapse

Methods included from TypeCheck

#Child!, #Child?, #Match!, #Match?, #Type!, #Type?

Constructor Details

#initialize(opts = {}, &block) ⇒ Core

Returns a new instance of Core.

Parameters:

  • block (Proc)

    for class instantiation

  • opts (Hash) (defaults to: {})

    a customizable set of options

Options Hash (opts):

  • name (String)
  • actor_class (Context)

    a class to be instantiated defining Actor’s behaviour

  • args (Array<Object>)

    arguments for actor_class instantiation

  • executor, (Executor)

    default is ‘global_io_executor`

  • link, (true, false)

    atomically link the actor to its parent (default: true)

  • reference (Class)

    a custom descendant of Reference to use

  • behaviour_definition, (Array<Array(Behavior::Abstract, Array<Object>)>)

    array of pairs where each pair is behaviour class and its args, see Behaviour.basic_behaviour_definition

  • initialized, (ResolvableFuture, nil)

    if present it’ll be set or failed after Concurrent::Actor::Context initialization

  • parent (Reference, nil)

    **private api** parent of the actor (the one spawning )

  • logger (Proc, nil)

    a proc accepting (level, progname, message = nil, &block) params, can be used to hook actor instance to any logging system, see Concern::Logging



50
51
52
53
# File 'lib/concurrent-ruby-edge/concurrent/actor/core.rb', line 50

def initialize(opts = {}, &block)
  super(&nil)
  synchronize { ns_initialize(opts, &block) }
end

Instance Attribute Details

#actor_classContext (readonly)

A subclass of AbstractContext representing Actor’s behaviour.

Returns:



35
# File 'lib/concurrent-ruby-edge/concurrent/actor/core.rb', line 35

attr_reader :reference, :name, :path, :executor, :context_class, :context, :behaviour_definition

#behaviour_definitionObject (readonly)



35
36
37
# File 'lib/concurrent-ruby-edge/concurrent/actor/core.rb', line 35

def behaviour_definition
  @behaviour_definition
end

#contextObject (readonly)



35
36
37
# File 'lib/concurrent-ruby-edge/concurrent/actor/core.rb', line 35

def context
  @context
end

#context_classObject (readonly)



35
36
37
# File 'lib/concurrent-ruby-edge/concurrent/actor/core.rb', line 35

def context_class
  @context_class
end

#executorExecutor (readonly)

Executor which is used to process messages.

Returns:

  • (Executor)


35
# File 'lib/concurrent-ruby-edge/concurrent/actor/core.rb', line 35

attr_reader :reference, :name, :path, :executor, :context_class, :context, :behaviour_definition

#nameString (readonly)

The name of actor instance, it should be uniq (not enforced). Allows easier orientation between actor instances.

Returns:

  • (String)


35
# File 'lib/concurrent-ruby-edge/concurrent/actor/core.rb', line 35

attr_reader :reference, :name, :path, :executor, :context_class, :context, :behaviour_definition

#pathString (readonly)

Path of this actor. It is used for easier orientation and logging. Path is constructed recursively with: ‘parent.path + self.name` up to a Concurrent::Actor.root, e.g. `/an_actor/its_child`.

Returns:

  • (String)


35
# File 'lib/concurrent-ruby-edge/concurrent/actor/core.rb', line 35

attr_reader :reference, :name, :path, :executor, :context_class, :context, :behaviour_definition

#referenceReference (readonly)

Reference to this actor which can be safely passed around.

Returns:



35
36
37
# File 'lib/concurrent-ruby-edge/concurrent/actor/core.rb', line 35

def reference
  @reference
end

Instance Method Details

#add_child(child) ⇒ Object

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.



74
75
76
77
78
79
# File 'lib/concurrent-ruby-edge/concurrent/actor/core.rb', line 74

def add_child(child)
  guard!
  Type! child, Reference
  @children.add child
  nil
end

#allocate_contextObject

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.



150
151
152
# File 'lib/concurrent-ruby-edge/concurrent/actor/core.rb', line 150

def allocate_context
  @context = @context_class.allocate
end

#behaviour(behaviour_class) ⇒ Behaviour::Abstract?

Returns based on behaviour_class.

Parameters:

  • behaviour_class (Class)

Returns:



138
139
140
# File 'lib/concurrent-ruby-edge/concurrent/actor/core.rb', line 138

def behaviour(behaviour_class)
  @behaviours[behaviour_class]
end

#behaviour!(behaviour_class) ⇒ Behaviour::Abstract

Returns based on behaviour_class.

Parameters:

  • behaviour_class (Class)

Returns:

Raises:

  • (KeyError)

    when no behaviour



145
146
147
# File 'lib/concurrent-ruby-edge/concurrent/actor/core.rb', line 145

def behaviour!(behaviour_class)
  @behaviours.fetch behaviour_class
end

#broadcast(public, event) ⇒ Object



131
132
133
134
# File 'lib/concurrent-ruby-edge/concurrent/actor/core.rb', line 131

def broadcast(public, event)
  log(DEBUG) { "event: #{event.inspect} (#{public ? 'public' : 'private'})" }
  @first_behaviour.on_event(public, event)
end

#build_contextObject

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.



155
156
157
158
# File 'lib/concurrent-ruby-edge/concurrent/actor/core.rb', line 155

def build_context
  @context.send :initialize_core, self
  @context.send :initialize, *@args, &@block
end

#childrenArray<Reference>

Returns of children actors.

Returns:



68
69
70
71
# File 'lib/concurrent-ruby-edge/concurrent/actor/core.rb', line 68

def children
  guard!
  @children.to_a
end

#dead_letter_routingObject



63
64
65
# File 'lib/concurrent-ruby-edge/concurrent/actor/core.rb', line 63

def dead_letter_routing
  @context.dead_letter_routing
end

#guard!Object

ensures that we are inside of the executor



102
103
104
105
106
# File 'lib/concurrent-ruby-edge/concurrent/actor/core.rb', line 102

def guard!
  unless Actor.current == reference
    raise "can be called only inside actor #{reference} but was #{Actor.current}"
  end
end

#log(level, message = nil, &block) ⇒ Object



108
109
110
# File 'lib/concurrent-ruby-edge/concurrent/actor/core.rb', line 108

def log(level, message = nil, &block)
  super level, @path, message, &block
end

#on_envelope(envelope) ⇒ Object

is executed by Reference scheduling processing of new messages can be called from other alternative Reference implementations

Parameters:



92
93
94
95
96
97
98
99
# File 'lib/concurrent-ruby-edge/concurrent/actor/core.rb', line 92

def on_envelope(envelope)
  log(DEBUG) { "is  #{envelope.future ? 'asked' : 'told'} #{envelope.message.inspect} by #{envelope.sender}" }
  schedule_execution do
    log(DEBUG) { "was #{envelope.future ? 'asked' : 'told'} #{envelope.message.inspect} by #{envelope.sender} - processing" }
    process_envelope envelope
  end
  nil
end

#parentReference?

A parent Actor. When actor is spawned the Concurrent::Actor.current becomes its parent. When actor is spawned from a thread outside of an actor (Concurrent::Actor.current is nil) Concurrent::Actor.root is assigned.

Returns:



58
59
60
# File 'lib/concurrent-ruby-edge/concurrent/actor/core.rb', line 58

def parent
  @parent_core && @parent_core.reference
end

#process_envelope(envelope) ⇒ Object

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.



161
162
163
# File 'lib/concurrent-ruby-edge/concurrent/actor/core.rb', line 161

def process_envelope(envelope)
  @first_behaviour.on_envelope envelope
end

#remove_child(child) ⇒ Object

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.



82
83
84
85
86
87
# File 'lib/concurrent-ruby-edge/concurrent/actor/core.rb', line 82

def remove_child(child)
  guard!
  Type! child, Reference
  @children.delete child
  nil
end

#schedule_executionObject

Schedules blocks to be executed on executor sequentially, sets Actress.current



114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
# File 'lib/concurrent-ruby-edge/concurrent/actor/core.rb', line 114

def schedule_execution
  @serialized_execution.post(@executor) do
    synchronize do
      begin
        Thread.current[:__current_actor__] = reference
        yield
      rescue => e
        log FATAL, e
      ensure
        Thread.current[:__current_actor__] = nil
      end
    end
  end

  nil
end