Class: CZTop::Actor
- Inherits:
-
Object
- Object
- CZTop::Actor
- Extended by:
- HasFFIDelegate::ClassMethods
- Includes:
- CZMQ::FFI, HasFFIDelegate, PolymorphicZsockMethods, SendReceiveMethods, ZsockOptions
- Defined in:
- lib/cztop/actor.rb
Overview
Represents a CZMQ::FFI::Zactor.
About Thread-Safety
The instance methods of this class are thread-safe. So it’s safe to call #<<, #request or even #terminate from different threads. Caution: Use only these methods to communicate with the low-level zactor. Don’t use Message#send_to directly to send itself to an Actor instance, as it wouldn’t be thread-safe.
About termination
Actors should be terminated explicitly, either by calling #terminate from the current process or sending them the TERMINATE command (from outside). Not terminating them explicitly might make the process block at exit.
Defined Under Namespace
Classes: DeadActorError
Constant Summary collapse
- TERMINATE =
the command which causes an actor handler to terminate
'$TERM'
- SEND_TIMEOUT =
timeout to use when sending the actor a message
20
Constants included from SendReceiveMethods
SendReceiveMethods::FD_TIMEOUT, SendReceiveMethods::JIFFY
Instance Attribute Summary collapse
-
#exception ⇒ Exception
readonly
The exception that crashed this actor, if any.
Attributes included from HasFFIDelegate
Instance Method Summary collapse
-
#<<(message) ⇒ self
Send a message to the actor.
-
#crashed? ⇒ Boolean
Whether this actor has crashed.
-
#dead? ⇒ Boolean
Whether this actor is dead (terminated or crashed).
-
#initialize(callback = nil, c_args = nil) {|message, pipe| ... } ⇒ Actor
constructor
Creates a new actor.
-
#receive ⇒ Message
Receive a message from the actor.
-
#request(message) ⇒ Message
Same as #<<, but also waits for a response from the actor and returns it.
-
#send_picture(picture, *args) ⇒ void
Sends a message according to a “picture”.
-
#terminate ⇒ Boolean
Tells the actor to terminate and waits for it.
-
#wait ⇒ Integer
Thread-safe PolymorphicZsockMethods#wait.
Methods included from HasFFIDelegate::ClassMethods
ffi_delegate, from_ffi_delegate
Methods included from PolymorphicZsockMethods
Methods included from SendReceiveMethods
#read_timeout, #wait_for_fd_signal, #wait_writable, #write_timeout
Methods included from ZsockOptions
#fd, #options, #readable?, #to_io, #writable?
Methods included from HasFFIDelegate
#attach_ffi_delegate, #from_ffi_delegate, raise_zmq_err, #to_ptr
Constructor Details
#initialize(callback = nil, c_args = nil) {|message, pipe| ... } ⇒ Actor
Creates a new actor. Either pass a callback directly or a block. The callback/block will be called for every received message.
In case the given callback is an FFI::Pointer (to a C function), it’s used as-is. It is expected to do the handshake (signal) itself.
76 77 78 79 80 81 82 83 84 85 |
# File 'lib/cztop/actor.rb', line 76 def initialize(callback = nil, c_args = nil, &handler) @running = true @mtx = Mutex.new @callback = callback || handler @callback = shim(@callback) unless @callback.is_a? ::FFI::Pointer ffi_delegate = Zactor.new(@callback, c_args) attach_ffi_delegate(ffi_delegate) .sndtimeo = SEND_TIMEOUT # see #<< end |
Instance Attribute Details
#exception ⇒ Exception (readonly)
Returns the exception that crashed this actor, if any.
60 61 62 |
# File 'lib/cztop/actor.rb', line 60 def exception @exception end |
Instance Method Details
#<<(message) ⇒ self
Normally this method is asynchronous, but if the message is TERMINATE, it blocks until the actor is terminated.
Send a message to the actor.
96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 |
# File 'lib/cztop/actor.rb', line 96 def <<() = Message.coerce() if TERMINATE == [0] # NOTE: can't just send this to the actor. The sender might call # #terminate immediately, which most likely causes a hang due to race # conditions. terminate else begin @mtx.synchronize do raise DeadActorError unless @running .send_to(self) end rescue IO::EAGAINWaitWritable, IO::TimeoutError # The sndtimeo has been reached. # # This should fix the race condition (mainly on JRuby) between # @running not being set to false yet but the actor handler already # terminating and thus not able to receive messages anymore. # # This shouldn't result in an infinite loop, since it'll stop as # soon as @running is set to false by #signal_shimmed_handler_death, # at least when using a Ruby handler. # # In case of a C function handler, it MUST NOT crash and only # terminate when being sent the {TERMINATE} message using #terminate (so # #await_handler_death can set @running to false). retry end end self end |
#crashed? ⇒ Boolean
Returns whether this actor has crashed.
218 219 220 |
# File 'lib/cztop/actor.rb', line 218 def crashed? !!@exception # if set, it has crashed end |
#dead? ⇒ Boolean
Returns whether this actor is dead (terminated or crashed).
211 212 213 |
# File 'lib/cztop/actor.rb', line 211 def dead? !@running end |
#receive ⇒ Message
Receive a message from the actor.
136 137 138 139 140 141 142 |
# File 'lib/cztop/actor.rb', line 136 def receive @mtx.synchronize do raise DeadActorError unless @running super end end |
#request(message) ⇒ Message
Same as #<<, but also waits for a response from the actor and returns it.
150 151 152 153 154 155 156 157 158 159 160 161 162 163 |
# File 'lib/cztop/actor.rb', line 150 def request() @mtx.synchronize do raise DeadActorError unless @running = Message.coerce() raise ArgumentError, 'use #terminate' if TERMINATE == [0] .send_to(self) Message.receive_from(self) end rescue IO::EAGAINWaitWritable # same as in #<< retry end |
#send_picture(picture, *args) ⇒ void
Mainly added for Beacon. If implemented there, it wouldn’t be thread safe. And it’s not that useful to be added to SendReceiveMethods.
This method returns an undefined value.
Sends a message according to a “picture”.
175 176 177 178 179 180 181 |
# File 'lib/cztop/actor.rb', line 175 def send_picture(picture, *args) @mtx.synchronize do raise DeadActorError unless @running Zsock.send(ffi_delegate, picture, *args) end end |
#terminate ⇒ Boolean
Tells the actor to terminate and waits for it. Idempotent.
196 197 198 199 200 201 202 203 204 205 206 207 |
# File 'lib/cztop/actor.rb', line 196 def terminate @mtx.synchronize do return false unless @running Message.new(TERMINATE).send_to(self) await_handler_death true end rescue IO::EAGAINWaitWritable, IO::TimeoutError # same as in #<< retry end |
#wait ⇒ Integer
Thread-safe PolymorphicZsockMethods#wait.
186 187 188 189 190 |
# File 'lib/cztop/actor.rb', line 186 def wait @mtx.synchronize do super end end |