Class: Concurrent::ProcessingActor
- Inherits:
-
Synchronization::Object
- Object
- Synchronization::Object
- Concurrent::ProcessingActor
- Defined in:
- lib/concurrent-ruby-edge/concurrent/edge/processing_actor.rb
Overview
A new implementation of actor which also simulates the process, therefore it can be used in the same way as Erlang’s actors but without occupying thread. A tens of thousands ProcessingActors can run at the same time sharing a thread pool.
Class Method Summary collapse
-
.act(*args, &process) ⇒ ProcessingActor
Creates an actor.
-
.act_listening(channel, *args) {|actor, *args| ... } ⇒ ProcessingActor
Creates an actor listening to a specified channel (mailbox).
Instance Method Summary collapse
-
#ask_op(answer = Promises.resolvable_future, &message_provider) ⇒ Object
actor.ask2 { |a| [:count, a] }.
-
#mailbox ⇒ Promises::Channel
Actor’s mailbox.
-
#receive(channel = mailbox) ⇒ Object
# Receives a message when available, used in the actor’s process.
-
#tell!(message) ⇒ self
Tells a message to the actor.
-
#tell_op(message) ⇒ Promises::Future(ProcessingActor)
Tells a message to the actor.
-
#termination ⇒ Promises::Future(Object)
A future which is resolved when the actor ends its processing.
- #to_ary ⇒ Object
-
#to_s ⇒ String
(also: #inspect)
String representation.
Class Method Details
.act(*args, &process) ⇒ ProcessingActor
Creates an actor.
54 55 56 |
# File 'lib/concurrent-ruby-edge/concurrent/edge/processing_actor.rb', line 54 def self.act(*args, &process) act_listening Promises::Channel.new, *args, &process end |
.act_listening(channel, *args) {|actor, *args| ... } ⇒ ProcessingActor
Creates an actor listening to a specified channel (mailbox).
67 68 69 |
# File 'lib/concurrent-ruby-edge/concurrent/edge/processing_actor.rb', line 67 def self.act_listening(channel, *args, &process) ProcessingActor.new channel, *args, &process end |
Instance Method Details
#ask_op(answer = Promises.resolvable_future, &message_provider) ⇒ Object
actor.ask2 { |a| [:count, a] }
157 158 159 160 161 162 |
# File 'lib/concurrent-ruby-edge/concurrent/edge/processing_actor.rb', line 157 def ask_op(answer = Promises.resolvable_future, &) # TODO (pitr-ch 12-Dec-2018): is it ok to let the answers be unanswered when the actor terminates tell_op(.call(answer)).then(answer) { |_, a| a } # answer.chain { |v| [true, v] } | @Terminated.then end |
#mailbox ⇒ Promises::Channel
Returns actor’s mailbox.
29 30 31 |
# File 'lib/concurrent-ruby-edge/concurrent/edge/processing_actor.rb', line 29 def mailbox @Mailbox end |
#receive(channel = mailbox) ⇒ Object
# Receives a message when available, used in the actor’s process. # @return [Promises::Future(Object)] a future which will be fulfilled with a message from # mailbox when it is available. def receive(*channels)
channels = [@Mailbox] if channels.empty?
Promises::Channel.select(*channels)
# TODO (pitr-ch 27-Dec-2016): support patterns
# - put any received message aside if it does not match
# - on each receive call check the messages put aside
# - track where the message came from, cannot later receive m form other channel only because it matches
end
83 84 85 |
# File 'lib/concurrent-ruby-edge/concurrent/edge/processing_actor.rb', line 83 def receive(channel = mailbox) channel.pop_op end |
#tell!(message) ⇒ self
Tells a message to the actor. May block current thread if the mailbox is full. #tell_op is a better option since it does not block. It’s usually used to integrate with threading code.
99 100 101 102 |
# File 'lib/concurrent-ruby-edge/concurrent/edge/processing_actor.rb', line 99 def tell!() @Mailbox.push() self end |
#tell_op(message) ⇒ Promises::Future(ProcessingActor)
Tells a message to the actor.
108 109 110 |
# File 'lib/concurrent-ruby-edge/concurrent/edge/processing_actor.rb', line 108 def tell_op() @Mailbox.push_op().then(self) { |_ch, actor| actor } end |
#termination ⇒ Promises::Future(Object)
Returns a future which is resolved when the actor ends its processing. It can either be fulfilled with a value when actor ends normally or rejected with a reason (exception) when actor fails.
36 37 38 |
# File 'lib/concurrent-ruby-edge/concurrent/edge/processing_actor.rb', line 36 def termination @Terminated.with_hidden_resolvable end |
#to_ary ⇒ Object
171 172 173 |
# File 'lib/concurrent-ruby-edge/concurrent/edge/processing_actor.rb', line 171 def to_ary [@Mailbox, @Terminated] end |
#to_s ⇒ String Also known as: inspect
Returns string representation.
165 166 167 |
# File 'lib/concurrent-ruby-edge/concurrent/edge/processing_actor.rb', line 165 def to_s format '%s termination: %s>', super[0..-2], termination.state end |