Class: Concurrent::Actor::Utils::Balancer
- Inherits:
-
RestartingContext
- Object
- AbstractContext
- RestartingContext
- Concurrent::Actor::Utils::Balancer
- Defined in:
- lib/concurrent-ruby-edge/concurrent/actor/utils/balancer.rb
Overview
Distributes messages between subscribed actors. Each actor’ll get only one message then it’s unsubscribed. The actor needs to resubscribe when it’s ready to receive next message. It will buffer the messages if there is no worker registered.
Instance Attribute Summary
Attributes inherited from AbstractContext
Instance Method Summary collapse
- #distribute ⇒ Object
-
#initialize ⇒ Balancer
constructor
A new instance of Balancer.
- #on_message(message) ⇒ Object
Methods inherited from RestartingContext
Methods inherited from AbstractContext
#ask, #behaviour_definition, #dead_letter_routing, #default_executor, #default_reference_class, #envelope, #on_envelope, #on_event, #pass, spawn, spawn!, #tell
Methods included from InternalDelegations
#behaviour, #behaviour!, #children, #context, #dead_letter_routing, #log, #redirect, #terminate!, #terminated?
Methods included from PublicDelegations
#context_class, #executor, #name, #parent, #path, #reference
Methods included from TypeCheck
#Child!, #Child?, #Match!, #Match?, #Type!, #Type?
Constructor Details
#initialize ⇒ Balancer
Returns a new instance of Balancer.
13 14 15 16 |
# File 'lib/concurrent-ruby-edge/concurrent/actor/utils/balancer.rb', line 13 def initialize @receivers = [] @buffer = [] end |
Instance Method Details
#distribute ⇒ Object
37 38 39 40 41 |
# File 'lib/concurrent-ruby-edge/concurrent/actor/utils/balancer.rb', line 37 def distribute while !@receivers.empty? && !@buffer.empty? redirect @receivers.shift, @buffer.shift end end |
#on_message(message) ⇒ Object
18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 |
# File 'lib/concurrent-ruby-edge/concurrent/actor/utils/balancer.rb', line 18 def () command, who = case command when :subscribe @receivers << (who || envelope.sender) distribute true when :unsubscribe @receivers.delete(who || envelope.sender) true when :subscribed? @receivers.include?(who || envelope.sender) else @buffer << envelope distribute Behaviour::MESSAGE_PROCESSED end end |