Module: Concurrent::Actor::Behaviour
- Defined in:
- lib/concurrent-ruby-edge/concurrent/actor/behaviour.rb,
lib/concurrent-ruby-edge/concurrent/actor/behaviour/awaits.rb,
lib/concurrent-ruby-edge/concurrent/actor/behaviour/buffer.rb,
lib/concurrent-ruby-edge/concurrent/actor/behaviour/linking.rb,
lib/concurrent-ruby-edge/concurrent/actor/behaviour/pausing.rb,
lib/concurrent-ruby-edge/concurrent/actor/behaviour/abstract.rb,
lib/concurrent-ruby-edge/concurrent/actor/behaviour/supervising.rb,
lib/concurrent-ruby-edge/concurrent/actor/behaviour/termination.rb,
lib/concurrent-ruby-edge/concurrent/actor/behaviour/sets_results.rb,
lib/concurrent-ruby-edge/concurrent/actor/behaviour/removes_child.rb,
lib/concurrent-ruby-edge/concurrent/actor/behaviour/executes_context.rb,
lib/concurrent-ruby-edge/concurrent/actor/behaviour/errors_on_unknown_message.rb
Overview
Actors have modular architecture, which is achieved by combining a light core with chain of behaviours. Each message or internal event propagates through the chain allowing the behaviours react based on their responsibility.
-
> Links the actor to other actors and sends actor’s events to them, like: ‘:terminated`, `:paused`, `:resumed`, errors, etc. Linked actor needs to handle those messages.
listener = AdHoc.spawn name: :listener do lambda do || case when Reference if .ask!(:linked?) << :unlink else << :link end else puts "got event #{.inspect} from #{envelope.sender}" end end end an_actor = AdHoc.spawn name: :an_actor, supervise: true, behaviour_definition: Behaviour.restarting_behaviour_definition do lambda { || raise 'failed'} end # link the actor listener.ask(an_actor).wait an_actor.ask(:fail).wait # unlink the actor listener.ask(an_actor).wait an_actor.ask(:fail).wait an_actor << :terminate!
produces only two events, other events happened after unlinking
got event #<RuntimeError: failed> from #<Concurrent::Actor::Reference /an_actor (Concurrent::Actor::Utils::AdHoc)> got event :reset from #<Concurrent::Actor::Reference /an_actor (Concurrent::Actor::Utils::AdHoc)>
-
> Accepts ‘:await` messages. Which allows to wait on Actor to process all previously send messages.
actor << :a << :b actor.ask(:await).wait # blocks until :a and :b are processed
-
> Allows to pause actors on errors. When paused all arriving messages are collected and processed after the actor is resumed or reset. Resume will simply continue with next message. Reset also reinitialized context.
-
> Handles supervised actors. Handle configures what to do with failed child: :terminate!, :resume!, :reset!, or :restart!. Strategy sets :one_for_one (restarts just failed actor) or :one_for_all (restarts all child actors).
-
> Handles supervised actors. Handle configures what to do with failed child: :terminate!, :resume!, :reset!, or :restart!. Strategy sets :one_for_one (restarts just failed actor) or :one_for_all (restarts all child actors).
-
> Delegates messages and events to AbstractContext instance.
-
> Simply fails when message arrives here. It’s usually the last behaviour.
-
> Handles actor termination. Waits until all its children are terminated, can be configured on behaviour initialization.
-
> Removes terminated children.
If needed new behaviours can be added, or old one removed to get required behaviour.
-
Context uses Array of behaviours and their construction parameters.
[[Behaviour::SetResults, :terminate!], [Behaviour::RemovesChild], [Behaviour::Termination], [Behaviour::Linking], [Behaviour::Awaits], [Behaviour::ExecutesContext], [Behaviour::ErrorsOnUnknownMessage]]
-
RestartingContext uses Array of behaviours and their construction parameters.
[[Behaviour::SetResults, :pause!], [Behaviour::RemovesChild], [Behaviour::Termination], [Behaviour::Linking], [Behaviour::Pausing], [Behaviour::Supervising, :reset!, :one_for_one], [Behaviour::Awaits], [Behaviour::ExecutesContext], [Behaviour::ErrorsOnUnknownMessage]]
Defined Under Namespace
Classes: Abstract, Awaits, Buffer, ErrorsOnUnknownMessage, ExecutesContext, Linking, Pausing, RemovesChild, SetResults, Supervising, Termination
Constant Summary collapse
- MESSAGE_PROCESSED =
::Object.new
Class Method Summary collapse
- .base(on_error) ⇒ Object
-
.basic_behaviour_definition ⇒ Object
Array of behaviours and their construction parameters.
- .linking ⇒ Object
-
.restarting_behaviour_definition(handle = :reset!, strategy = :one_for_one) ⇒ Object
Array of behaviours and their construction parameters.
- .supervised ⇒ Object
- .supervising(handle = :reset!, strategy = :one_for_one) ⇒ Object
- .user_messages ⇒ Object
Class Method Details
.base(on_error) ⇒ Object
105 106 107 108 109 110 |
# File 'lib/concurrent-ruby-edge/concurrent/actor/behaviour.rb', line 105 def self.base(on_error) [[SetResults, on_error], # has to be before Termination to be able to remove children from terminated actor RemovesChild, Termination] end |
.basic_behaviour_definition ⇒ Object
Array of behaviours and their construction parameters.
[[Behaviour::SetResults, :terminate!],
[Behaviour::RemovesChild],
[Behaviour::Termination],
[Behaviour::Linking],
[Behaviour::Awaits],
[Behaviour::ExecutesContext],
[Behaviour::ErrorsOnUnknownMessage]]
77 78 79 80 81 |
# File 'lib/concurrent-ruby-edge/concurrent/actor/behaviour.rb', line 77 def self.basic_behaviour_definition [*base(:terminate!), *linking, *] end |
.linking ⇒ Object
113 114 115 |
# File 'lib/concurrent-ruby-edge/concurrent/actor/behaviour.rb', line 113 def self.linking [Linking] end |
.restarting_behaviour_definition(handle = :reset!, strategy = :one_for_one) ⇒ Object
Array of behaviours and their construction parameters.
[[Behaviour::SetResults, :pause!],
[Behaviour::RemovesChild],
[Behaviour::Termination],
[Behaviour::Linking],
[Behaviour::Pausing],
[Behaviour::Supervising, :reset!, :one_for_one],
[Behaviour::Awaits],
[Behaviour::ExecutesContext],
[Behaviour::ErrorsOnUnknownMessage]]
96 97 98 99 100 101 102 |
# File 'lib/concurrent-ruby-edge/concurrent/actor/behaviour.rb', line 96 def self.restarting_behaviour_definition(handle = :reset!, strategy = :one_for_one) [*base(:pause!), *linking, *supervised, *supervising(handle, strategy), *] end |
.supervised ⇒ Object
118 119 120 |
# File 'lib/concurrent-ruby-edge/concurrent/actor/behaviour.rb', line 118 def self.supervised [Pausing] end |
.supervising(handle = :reset!, strategy = :one_for_one) ⇒ Object
123 124 125 |
# File 'lib/concurrent-ruby-edge/concurrent/actor/behaviour.rb', line 123 def self.supervising(handle = :reset!, strategy = :one_for_one) [[Behaviour::Supervising, handle, strategy]] end |
.user_messages ⇒ Object
128 129 130 131 132 |
# File 'lib/concurrent-ruby-edge/concurrent/actor/behaviour.rb', line 128 def self. [Awaits, ExecutesContext, ErrorsOnUnknownMessage] end |