Class: CZTop::Actor

Inherits:
Object
  • Object
show all
Extended by:
HasFFIDelegate::ClassMethods
Includes:
CZMQ::FFI, HasFFIDelegate, PolymorphicZsockMethods, SendReceiveMethods, ZsockOptions
Defined in:
lib/cztop/actor.rb

Overview

Represents a CZMQ::FFI::Zactor.

About Thread-Safety

The instance methods of this class are thread-safe. So it’s safe to call #<<, #request or even #terminate from different threads. Caution: Use only these methods to communicate with the low-level zactor. Don’t use Message#send_to directly to send itself to an Actor instance, as it wouldn’t be thread-safe.

About termination

Actors should be terminated explicitly, either by calling #terminate from the current process or sending them the TERMINATE command (from outside). Not terminating them explicitly might make the process block at exit.

Examples:

Simple Actor with Ruby block

result = ""
a = CZTop::Actor.new do |msg, pipe|
  case msg[0]
  when "foo"
    pipe << "bar"
  when "append"
    result << msg[1].to_s
  when "result"
    pipe << result
  end
end
a.request("foo")[0] #=> "bar"
a.request("foo")[0] #=> "bar"
a << ["append", "baz"] << ["append", "baz"]
a.request("result")[0] #=> "bazbaz"

See Also:

Defined Under Namespace

Classes: DeadActorError

Constant Summary collapse

TERMINATE =

the command which causes an actor handler to terminate

'$TERM'
SEND_TIMEOUT =

timeout to use when sending the actor a message

20

Constants included from SendReceiveMethods

SendReceiveMethods::FD_TIMEOUT, SendReceiveMethods::JIFFY

Instance Attribute Summary collapse

Attributes included from HasFFIDelegate

#ffi_delegate

Instance Method Summary collapse

Methods included from HasFFIDelegate::ClassMethods

ffi_delegate, from_ffi_delegate

Methods included from PolymorphicZsockMethods

#set_unbounded, #signal

Methods included from SendReceiveMethods

#now, #read_timeout, #wait_for_fd_signal, #wait_writable, #write_timeout

Methods included from ZsockOptions

#fd, #options, #readable?, #to_io, #writable?

Methods included from HasFFIDelegate

#attach_ffi_delegate, #from_ffi_delegate, raise_zmq_err, #raise_zmq_err, #to_ptr

Constructor Details

#initialize(callback = nil, c_args = nil) {|message, pipe| ... } ⇒ Actor

Creates a new actor. Either pass a callback directly or a block. The callback/block will be called for every received message.

In case the given callback is an FFI::Pointer (to a C function), it’s used as-is. It is expected to do the handshake (signal) itself.

Parameters:

  • callback (FFI::Pointer, Proc, #call) (defaults to: nil)

    pointer to a C function or just anything callable

  • c_args (FFI::Pointer, nil) (defaults to: nil)

    args, only useful if callback is an FFI::Pointer

Yield Parameters:

See Also:



76
77
78
79
80
81
82
83
84
85
# File 'lib/cztop/actor.rb', line 76

def initialize(callback = nil, c_args = nil, &handler)
  @running         = true
  @mtx             = Mutex.new
  @callback        = callback || handler
  @callback        = shim(@callback) unless @callback.is_a? ::FFI::Pointer
  ffi_delegate     = Zactor.new(@callback, c_args)

  attach_ffi_delegate(ffi_delegate)
  options.sndtimeo = SEND_TIMEOUT # see #<<
end

Instance Attribute Details

#exceptionException (readonly)

Returns the exception that crashed this actor, if any.

Returns:

  • (Exception)

    the exception that crashed this actor, if any



60
61
62
# File 'lib/cztop/actor.rb', line 60

def exception
  @exception
end

Instance Method Details

#<<(message) ⇒ self

Note:

Normally this method is asynchronous, but if the message is TERMINATE, it blocks until the actor is terminated.

Send a message to the actor.

Parameters:

  • message (Object)

    message to send to the actor, see Message.coerce

Returns:

  • (self)

    so it’s chainable

Raises:



96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
# File 'lib/cztop/actor.rb', line 96

def <<(message)
  message = Message.coerce(message)

  if TERMINATE == message[0]
    # NOTE: can't just send this to the actor. The sender might call
    # #terminate immediately, which most likely causes a hang due to race
    # conditions.
    terminate
  else
    begin
      @mtx.synchronize do
        raise DeadActorError unless @running

        message.send_to(self)
      end
    rescue IO::EAGAINWaitWritable, IO::TimeoutError
      # The sndtimeo has been reached.
      #
      # This should fix the race condition (mainly on JRuby) between
      # @running not being set to false yet but the actor handler already
      # terminating and thus not able to receive messages anymore.
      #
      # This shouldn't result in an infinite loop, since it'll stop as
      # soon as @running is set to false by #signal_shimmed_handler_death,
      # at least when using a Ruby handler.
      #
      # In case of a C function handler, it MUST NOT crash and only
      # terminate when being sent the {TERMINATE} message using #terminate (so
      # #await_handler_death can set @running to false).
      retry
    end
  end

  self
end

#await_handler_deathvoid (private)

This method returns an undefined value.

Waits for the C or Ruby handler to die.



323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
# File 'lib/cztop/actor.rb', line 323

def await_handler_death
  if handler_shimmed?
    # for Ruby block/Proc object handlers
    @handler_dead_signal.pop

  else
    # for handlers that are passed as C functions, we rely on normal death
    # signal

    # can't use #wait here because of recursive deadlock
    Zsock.wait(ffi_delegate)

    @running = false
  end
end

#crashed?Boolean

Returns whether this actor has crashed.

Returns:

  • (Boolean)

    whether this actor has crashed

See Also:



218
219
220
# File 'lib/cztop/actor.rb', line 218

def crashed?
  !!@exception # if set, it has crashed
end

#dead?Boolean

Returns whether this actor is dead (terminated or crashed).

Returns:

  • (Boolean)

    whether this actor is dead (terminated or crashed)



211
212
213
# File 'lib/cztop/actor.rb', line 211

def dead?
  !@running
end

#handler_shimmed?Boolean (private)

Returns whether the handler is a Ruby object, like a simple block (as opposed to a FFI::Pointer to a C function).

Returns:

  • (Boolean)

    whether the handler is a Ruby object, like a simple block (as opposed to a FFI::Pointer to a C function)



257
258
259
# File 'lib/cztop/actor.rb', line 257

def handler_shimmed?
  !!@handler_thread # if it exists, it's shimmed
end

#next_messageMessage (private)

Receives the next message even across any interrupts.

Returns:



291
292
293
# File 'lib/cztop/actor.rb', line 291

def next_message
  @pipe.receive
end

#process_messages(handler) {|message, pipe| ... } ⇒ Object (private)

Successively receive messages that were sent to the actor and yield them to the given handler to process them. The a pipe (a Socket::PAIR socket) is also passed to the handler so it can send back the result of a command, if needed.

When a message is TERMINATE, or when the waiting for a message is interrupted, execution is aborted and the actor will terminate.

Parameters:

  • handler (Proc, #call)

    the handler used to process messages

Yield Parameters:

  • message (Message)

    message (e.g. command) received

  • pipe (Socket::PAIR)

    pipe to write back something into the actor



274
275
276
277
278
279
280
281
282
283
284
285
286
# File 'lib/cztop/actor.rb', line 274

def process_messages(handler)
  while true
    begin
      message = next_message
    rescue Interrupt
      break
    else
      break if TERMINATE == message[0]
    end

    handler.call(message, @pipe)
  end
end

#receiveMessage

Receive a message from the actor.

Returns:

Raises:



136
137
138
139
140
141
142
# File 'lib/cztop/actor.rb', line 136

def receive
  @mtx.synchronize do
    raise DeadActorError unless @running

    super
  end
end

#request(message) ⇒ Message

Same as #<<, but also waits for a response from the actor and returns it.

Parameters:

  • message (Message)

    the request to the actor

Returns:

  • (Message)

    the actor’s response

Raises:



150
151
152
153
154
155
156
157
158
159
160
161
162
163
# File 'lib/cztop/actor.rb', line 150

def request(message)
  @mtx.synchronize do
    raise DeadActorError unless @running

    message = Message.coerce(message)
    raise ArgumentError, 'use #terminate' if TERMINATE == message[0]

    message.send_to(self)
    Message.receive_from(self)
  end
rescue IO::EAGAINWaitWritable
  # same as in #<<
  retry
end

#send_picture(picture, *args) ⇒ void

Note:

Mainly added for Beacon. If implemented there, it wouldn’t be thread safe. And it’s not that useful to be added to SendReceiveMethods.

This method returns an undefined value.

Sends a message according to a “picture”.

Parameters:

  • picture (String)

    message’s part types

  • args (String, Integer, ...)

    values, in FFI style (each one preceeded with it’s type, like :string, "foo")

See Also:

  • on http://api.zeromq.org/czmq3-0:zsock


175
176
177
178
179
180
181
# File 'lib/cztop/actor.rb', line 175

def send_picture(picture, *args)
  @mtx.synchronize do
    raise DeadActorError unless @running

    Zsock.send(ffi_delegate, picture, *args)
  end
end

#shim(handler) ⇒ FFI::Function (private)

Shims the given handler. The shim is used to do the handshake, to #process_messages, and ensure we’re notified when the handler has terminated.

Parameters:

  • handler (Proc, #call)

    the handler used to process messages

Returns:

  • (FFI::Function)

    the callback function to be passed to the zactor

Raises:

  • (ArgumentError)

    if invalid handler given



233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
# File 'lib/cztop/actor.rb', line 233

def shim(handler)
  raise ArgumentError, 'invalid handler' unless handler.respond_to?(:call)

  @handler_thread      = nil
  @handler_dead_signal = Queue.new # used for signaling

  Zactor.fn do |pipe_delegate, _args|
    @mtx.synchronize do
      @handler_thread = Thread.current
      @pipe           = Socket::PAIR.from_ffi_delegate(pipe_delegate)
      @pipe.signal # handshake, so zactor_new() returns
    end

    process_messages(handler)
  rescue Exception => e
    @exception = e
  ensure
    signal_shimmed_handler_death
  end
end

#signal_shimmed_handler_deathvoid (private)

This method returns an undefined value.

Creates a new thread that will signal the definitive termination of the Ruby handler.

This is needed to avoid the race condition between zactor_destroy() which will wait for a signal from the handler in case it was able to send the TERMINATE command, and the @callback which might still haven’t returned, but doesn’t receive any messages anymore.



305
306
307
308
309
310
311
312
313
314
315
316
317
318
# File 'lib/cztop/actor.rb', line 305

def signal_shimmed_handler_death
  # NOTE: can't just use ConditionVariable, as the signaling code might be
  # run BEFORE the waiting code.

  Thread.new do
    @handler_thread.join

    # NOTE: we do this here and not in #terminate, so it also works when
    # actor isn't terminated using #terminate
    @running = false

    @handler_dead_signal.push(nil)
  end
end

#terminateBoolean

Tells the actor to terminate and waits for it. Idempotent.

Returns:

  • (Boolean)

    whether it died just now (false if it was dead already)



196
197
198
199
200
201
202
203
204
205
206
207
# File 'lib/cztop/actor.rb', line 196

def terminate
  @mtx.synchronize do
    return false unless @running

    Message.new(TERMINATE).send_to(self)
    await_handler_death
    true
  end
rescue IO::EAGAINWaitWritable, IO::TimeoutError
  # same as in #<<
  retry
end

#waitInteger

Returns:

  • (Integer)


186
187
188
189
190
# File 'lib/cztop/actor.rb', line 186

def wait
  @mtx.synchronize do
    super
  end
end