Module: Concurrent::Actor::Behaviour

Defined in:
lib/concurrent-ruby-edge/concurrent/actor/behaviour.rb,
lib/concurrent-ruby-edge/concurrent/actor/behaviour/awaits.rb,
lib/concurrent-ruby-edge/concurrent/actor/behaviour/buffer.rb,
lib/concurrent-ruby-edge/concurrent/actor/behaviour/linking.rb,
lib/concurrent-ruby-edge/concurrent/actor/behaviour/pausing.rb,
lib/concurrent-ruby-edge/concurrent/actor/behaviour/abstract.rb,
lib/concurrent-ruby-edge/concurrent/actor/behaviour/supervising.rb,
lib/concurrent-ruby-edge/concurrent/actor/behaviour/termination.rb,
lib/concurrent-ruby-edge/concurrent/actor/behaviour/sets_results.rb,
lib/concurrent-ruby-edge/concurrent/actor/behaviour/removes_child.rb,
lib/concurrent-ruby-edge/concurrent/actor/behaviour/executes_context.rb,
lib/concurrent-ruby-edge/concurrent/actor/behaviour/errors_on_unknown_message.rb

Overview

Actors have modular architecture, which is achieved by combining a light core with chain of behaviours. Each message or internal event propagates through the chain allowing the behaviours react based on their responsibility.

  • Linking:

    > Links the actor to other actors and sends actor’s events to them, like: ‘:terminated`, `:paused`, `:resumed`, errors, etc. Linked actor needs to handle those messages.

    listener = AdHoc.spawn name: :listener do
      lambda do |message|
        case message
        when Reference
          if message.ask!(:linked?)
            message << :unlink
          else
            message << :link
          end
        else
          puts "got event #{message.inspect} from #{envelope.sender}"
        end
      end
    end
    
    an_actor = AdHoc.spawn name: :an_actor, supervise: true, behaviour_definition: Behaviour.restarting_behaviour_definition do
      lambda { |message| raise 'failed'}
    end
    
    # link the actor
    listener.ask(an_actor).wait
    an_actor.ask(:fail).wait
    # unlink the actor
    listener.ask(an_actor).wait
    an_actor.ask(:fail).wait
    an_actor << :terminate!
    

    produces only two events, other events happened after unlinking

    got event #<RuntimeError: failed> from #<Concurrent::Actor::Reference /an_actor (Concurrent::Actor::Utils::AdHoc)>
    got event :reset from #<Concurrent::Actor::Reference /an_actor (Concurrent::Actor::Utils::AdHoc)>
    

  • Awaits:

    > Accepts ‘:await` messages. Which allows to wait on Actor to process all previously send messages.

    actor << :a << :b
    actor.ask(:await).wait # blocks until :a and :b are processed
    

  • Pausing:

    > Allows to pause actors on errors. When paused all arriving messages are collected and processed after the actor is resumed or reset. Resume will simply continue with next message. Reset also reinitialized context.

  • Supervising:

    > Handles supervised actors. Handle configures what to do with failed child: :terminate!, :resume!, :reset!, or :restart!. Strategy sets :one_for_one (restarts just failed actor) or :one_for_all (restarts all child actors).

  • Supervising:

    > Handles supervised actors. Handle configures what to do with failed child: :terminate!, :resume!, :reset!, or :restart!. Strategy sets :one_for_one (restarts just failed actor) or :one_for_all (restarts all child actors).

  • ExecutesContext:

    > Delegates messages and events to AbstractContext instance.

  • ErrorsOnUnknownMessage:

    > Simply fails when message arrives here. It’s usually the last behaviour.

  • Termination:

    > Handles actor termination. Waits until all its children are terminated, can be configured on behaviour initialization.

  • RemovesChild:

    > Removes terminated children.

If needed new behaviours can be added, or old one removed to get required behaviour.

Defined Under Namespace

Classes: Abstract, Awaits, Buffer, ErrorsOnUnknownMessage, ExecutesContext, Linking, Pausing, RemovesChild, SetResults, Supervising, Termination

Constant Summary collapse

MESSAGE_PROCESSED =
::Object.new

Class Method Summary collapse

Class Method Details

.base(on_error) ⇒ Object

See Also:

  • its source code


105
106
107
108
109
110
# File 'lib/concurrent-ruby-edge/concurrent/actor/behaviour.rb', line 105

def self.base(on_error)
  [[SetResults, on_error],
   # has to be before Termination to be able to remove children from terminated actor
   RemovesChild,
   Termination]
end

.basic_behaviour_definitionObject

Array of behaviours and their construction parameters.

[[Behaviour::SetResults, :terminate!],
 [Behaviour::RemovesChild],
 [Behaviour::Termination],
 [Behaviour::Linking],
 [Behaviour::Awaits],
 [Behaviour::ExecutesContext],
 [Behaviour::ErrorsOnUnknownMessage]]

See Also:

  • its source code


77
78
79
80
81
# File 'lib/concurrent-ruby-edge/concurrent/actor/behaviour.rb', line 77

def self.basic_behaviour_definition
  [*base(:terminate!),
   *linking,
   *user_messages]
end

.linkingObject

See Also:

  • its source code


113
114
115
# File 'lib/concurrent-ruby-edge/concurrent/actor/behaviour.rb', line 113

def self.linking
  [Linking]
end

.restarting_behaviour_definition(handle = :reset!, strategy = :one_for_one) ⇒ Object

Array of behaviours and their construction parameters.

[[Behaviour::SetResults, :pause!],
 [Behaviour::RemovesChild],
 [Behaviour::Termination],
 [Behaviour::Linking],
 [Behaviour::Pausing],
 [Behaviour::Supervising, :reset!, :one_for_one],
 [Behaviour::Awaits],
 [Behaviour::ExecutesContext],
 [Behaviour::ErrorsOnUnknownMessage]]

See Also:

  • its source code


96
97
98
99
100
101
102
# File 'lib/concurrent-ruby-edge/concurrent/actor/behaviour.rb', line 96

def self.restarting_behaviour_definition(handle = :reset!, strategy = :one_for_one)
  [*base(:pause!),
   *linking,
   *supervised,
   *supervising(handle, strategy),
   *user_messages]
end

.supervisedObject

See Also:

  • its source code


118
119
120
# File 'lib/concurrent-ruby-edge/concurrent/actor/behaviour.rb', line 118

def self.supervised
  [Pausing]
end

.supervising(handle = :reset!, strategy = :one_for_one) ⇒ Object

See Also:

  • its source code


123
124
125
# File 'lib/concurrent-ruby-edge/concurrent/actor/behaviour.rb', line 123

def self.supervising(handle = :reset!, strategy = :one_for_one)
  [[Behaviour::Supervising, handle, strategy]]
end

.user_messagesObject

See Also:

  • its source code


128
129
130
131
132
# File 'lib/concurrent-ruby-edge/concurrent/actor/behaviour.rb', line 128

def self.user_messages
  [Awaits,
   ExecutesContext,
   ErrorsOnUnknownMessage]
end