Class: CZTop::Actor
- Inherits:
-
Object
- Object
- CZTop::Actor
- Extended by:
- HasFFIDelegate::ClassMethods
- Includes:
- CZMQ::FFI, HasFFIDelegate, PolymorphicZsockMethods, SendReceiveMethods, ZsockOptions
- Defined in:
- lib/cztop/actor.rb
Overview
Represents a CZMQ::FFI::Zactor.
About Thread-Safety
The instance methods of this class are thread-safe. So it’s safe to call #<<, #request or even #terminate from different threads. Caution: Use only these methods to communicate with the low-level zactor. Don’t use Message#send_to directly to send itself to an Actor instance, as it wouldn’t be thread-safe.
About termination
Actors should be terminated explicitly, either by calling #terminate from the current process or sending them the TERMINATE command (from outside). Not terminating them explicitly might make the process block at exit.
Defined Under Namespace
Classes: DeadActorError
Constant Summary collapse
- TERMINATE =
the command which causes an actor handler to terminate
'$TERM'
- SEND_TIMEOUT =
timeout to use when sending the actor a message
20
Constants included from SendReceiveMethods
SendReceiveMethods::FD_TIMEOUT, SendReceiveMethods::JIFFY
Instance Attribute Summary collapse
-
#exception ⇒ Exception
readonly
The exception that crashed this actor, if any.
Attributes included from HasFFIDelegate
Instance Method Summary collapse
-
#<<(message) ⇒ self
Send a message to the actor.
-
#await_handler_death ⇒ void
private
Waits for the C or Ruby handler to die.
-
#crashed? ⇒ Boolean
Whether this actor has crashed.
-
#dead? ⇒ Boolean
Whether this actor is dead (terminated or crashed).
-
#handler_shimmed? ⇒ Boolean
private
Whether the handler is a Ruby object, like a simple block (as opposed to a FFI::Pointer to a C function).
-
#initialize(callback = nil, c_args = nil) {|message, pipe| ... } ⇒ Actor
constructor
Creates a new actor.
-
#next_message ⇒ Message
private
Receives the next message even across any interrupts.
-
#process_messages(handler) {|message, pipe| ... } ⇒ Object
private
Successively receive messages that were sent to the actor and yield them to the given handler to process them.
-
#receive ⇒ Message
Receive a message from the actor.
-
#request(message) ⇒ Message
Same as #<<, but also waits for a response from the actor and returns it.
-
#send_picture(picture, *args) ⇒ void
Sends a message according to a “picture”.
-
#shim(handler) ⇒ FFI::Function
private
Shims the given handler.
-
#signal_shimmed_handler_death ⇒ void
private
Creates a new thread that will signal the definitive termination of the Ruby handler.
-
#terminate ⇒ Boolean
Tells the actor to terminate and waits for it.
-
#wait ⇒ Integer
Thread-safe PolymorphicZsockMethods#wait.
Methods included from HasFFIDelegate::ClassMethods
ffi_delegate, from_ffi_delegate
Methods included from PolymorphicZsockMethods
Methods included from SendReceiveMethods
#now, #read_timeout, #wait_for_fd_signal, #wait_writable, #write_timeout
Methods included from ZsockOptions
#fd, #options, #readable?, #to_io, #writable?
Methods included from HasFFIDelegate
#attach_ffi_delegate, #from_ffi_delegate, raise_zmq_err, #raise_zmq_err, #to_ptr
Constructor Details
#initialize(callback = nil, c_args = nil) {|message, pipe| ... } ⇒ Actor
Creates a new actor. Either pass a callback directly or a block. The callback/block will be called for every received message.
In case the given callback is an FFI::Pointer (to a C function), it’s used as-is. It is expected to do the handshake (signal) itself.
76 77 78 79 80 81 82 83 84 85 |
# File 'lib/cztop/actor.rb', line 76 def initialize(callback = nil, c_args = nil, &handler) @running = true @mtx = Mutex.new @callback = callback || handler @callback = shim(@callback) unless @callback.is_a? ::FFI::Pointer ffi_delegate = Zactor.new(@callback, c_args) attach_ffi_delegate(ffi_delegate) .sndtimeo = SEND_TIMEOUT # see #<< end |
Instance Attribute Details
#exception ⇒ Exception (readonly)
Returns the exception that crashed this actor, if any.
60 61 62 |
# File 'lib/cztop/actor.rb', line 60 def exception @exception end |
Instance Method Details
#<<(message) ⇒ self
Normally this method is asynchronous, but if the message is TERMINATE, it blocks until the actor is terminated.
Send a message to the actor.
96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 |
# File 'lib/cztop/actor.rb', line 96 def <<() = Message.coerce() if TERMINATE == [0] # NOTE: can't just send this to the actor. The sender might call # #terminate immediately, which most likely causes a hang due to race # conditions. terminate else begin @mtx.synchronize do raise DeadActorError unless @running .send_to(self) end rescue IO::EAGAINWaitWritable, IO::TimeoutError # The sndtimeo has been reached. # # This should fix the race condition (mainly on JRuby) between # @running not being set to false yet but the actor handler already # terminating and thus not able to receive messages anymore. # # This shouldn't result in an infinite loop, since it'll stop as # soon as @running is set to false by #signal_shimmed_handler_death, # at least when using a Ruby handler. # # In case of a C function handler, it MUST NOT crash and only # terminate when being sent the {TERMINATE} message using #terminate (so # #await_handler_death can set @running to false). retry end end self end |
#await_handler_death ⇒ void (private)
This method returns an undefined value.
Waits for the C or Ruby handler to die.
323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 |
# File 'lib/cztop/actor.rb', line 323 def await_handler_death if handler_shimmed? # for Ruby block/Proc object handlers @handler_dead_signal.pop else # for handlers that are passed as C functions, we rely on normal death # signal # can't use #wait here because of recursive deadlock Zsock.wait(ffi_delegate) @running = false end end |
#crashed? ⇒ Boolean
Returns whether this actor has crashed.
218 219 220 |
# File 'lib/cztop/actor.rb', line 218 def crashed? !!@exception # if set, it has crashed end |
#dead? ⇒ Boolean
Returns whether this actor is dead (terminated or crashed).
211 212 213 |
# File 'lib/cztop/actor.rb', line 211 def dead? !@running end |
#handler_shimmed? ⇒ Boolean (private)
Returns whether the handler is a Ruby object, like a simple block (as opposed to a FFI::Pointer to a C function).
257 258 259 |
# File 'lib/cztop/actor.rb', line 257 def handler_shimmed? !!@handler_thread # if it exists, it's shimmed end |
#next_message ⇒ Message (private)
Receives the next message even across any interrupts.
291 292 293 |
# File 'lib/cztop/actor.rb', line 291 def @pipe.receive end |
#process_messages(handler) {|message, pipe| ... } ⇒ Object (private)
Successively receive messages that were sent to the actor and yield them to the given handler to process them. The a pipe (a Socket::PAIR socket) is also passed to the handler so it can send back the result of a command, if needed.
When a message is TERMINATE, or when the waiting for a message is interrupted, execution is aborted and the actor will terminate.
274 275 276 277 278 279 280 281 282 283 284 285 286 |
# File 'lib/cztop/actor.rb', line 274 def (handler) while true begin = rescue Interrupt break else break if TERMINATE == [0] end handler.call(, @pipe) end end |
#receive ⇒ Message
Receive a message from the actor.
136 137 138 139 140 141 142 |
# File 'lib/cztop/actor.rb', line 136 def receive @mtx.synchronize do raise DeadActorError unless @running super end end |
#request(message) ⇒ Message
Same as #<<, but also waits for a response from the actor and returns it.
150 151 152 153 154 155 156 157 158 159 160 161 162 163 |
# File 'lib/cztop/actor.rb', line 150 def request() @mtx.synchronize do raise DeadActorError unless @running = Message.coerce() raise ArgumentError, 'use #terminate' if TERMINATE == [0] .send_to(self) Message.receive_from(self) end rescue IO::EAGAINWaitWritable # same as in #<< retry end |
#send_picture(picture, *args) ⇒ void
Mainly added for Beacon. If implemented there, it wouldn’t be thread safe. And it’s not that useful to be added to SendReceiveMethods.
This method returns an undefined value.
Sends a message according to a “picture”.
175 176 177 178 179 180 181 |
# File 'lib/cztop/actor.rb', line 175 def send_picture(picture, *args) @mtx.synchronize do raise DeadActorError unless @running Zsock.send(ffi_delegate, picture, *args) end end |
#shim(handler) ⇒ FFI::Function (private)
Shims the given handler. The shim is used to do the handshake, to #process_messages, and ensure we’re notified when the handler has terminated.
233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 |
# File 'lib/cztop/actor.rb', line 233 def shim(handler) raise ArgumentError, 'invalid handler' unless handler.respond_to?(:call) @handler_thread = nil @handler_dead_signal = Queue.new # used for signaling Zactor.fn do |pipe_delegate, _args| @mtx.synchronize do @handler_thread = Thread.current @pipe = Socket::PAIR.from_ffi_delegate(pipe_delegate) @pipe.signal # handshake, so zactor_new() returns end (handler) rescue Exception => e @exception = e ensure signal_shimmed_handler_death end end |
#signal_shimmed_handler_death ⇒ void (private)
This method returns an undefined value.
Creates a new thread that will signal the definitive termination of the Ruby handler.
This is needed to avoid the race condition between zactor_destroy() which will wait for a signal from the handler in case it was able to send the TERMINATE command, and the @callback which might still haven’t returned, but doesn’t receive any messages anymore.
305 306 307 308 309 310 311 312 313 314 315 316 317 318 |
# File 'lib/cztop/actor.rb', line 305 def signal_shimmed_handler_death # NOTE: can't just use ConditionVariable, as the signaling code might be # run BEFORE the waiting code. Thread.new do @handler_thread.join # NOTE: we do this here and not in #terminate, so it also works when # actor isn't terminated using #terminate @running = false @handler_dead_signal.push(nil) end end |
#terminate ⇒ Boolean
Tells the actor to terminate and waits for it. Idempotent.
196 197 198 199 200 201 202 203 204 205 206 207 |
# File 'lib/cztop/actor.rb', line 196 def terminate @mtx.synchronize do return false unless @running Message.new(TERMINATE).send_to(self) await_handler_death true end rescue IO::EAGAINWaitWritable, IO::TimeoutError # same as in #<< retry end |
#wait ⇒ Integer
Thread-safe PolymorphicZsockMethods#wait.
186 187 188 189 190 |
# File 'lib/cztop/actor.rb', line 186 def wait @mtx.synchronize do super end end |