Class: CZTop::Actor

Inherits:
Object
  • Object
show all
Extended by:
HasFFIDelegate::ClassMethods
Includes:
CZMQ::FFI, HasFFIDelegate, PolymorphicZsockMethods, SendReceiveMethods, ZsockOptions
Defined in:
lib/cztop/actor.rb

Overview

Represents a CZMQ::FFI::Zactor.

About Thread-Safety

The instance methods of this class are thread-safe. So it’s safe to call #<<, #request or even #terminate from different threads. Caution: Use only these methods to communicate with the low-level zactor. Don’t use Message#send_to directly to send itself to an Actor instance, as it wouldn’t be thread-safe.

About termination

Actors should be terminated explicitly, either by calling #terminate from the current process or sending them the TERMINATE command (from outside). Not terminating them explicitly might make the process block at exit.

Examples:

Simple Actor with Ruby block

result = ""
a = CZTop::Actor.new do |msg, pipe|
  case msg[0]
  when "foo"
    pipe << "bar"
  when "append"
    result << msg[1].to_s
  when "result"
    pipe << result
  end
end
a.request("foo")[0] #=> "bar"
a.request("foo")[0] #=> "bar"
a << ["append", "baz"] << ["append", "baz"]
a.request("result")[0] #=> "bazbaz"

See Also:

Defined Under Namespace

Classes: DeadActorError

Constant Summary collapse

TERMINATE =

the command which causes an actor handler to terminate

'$TERM'
SEND_TIMEOUT =

timeout to use when sending the actor a message

20

Constants included from SendReceiveMethods

SendReceiveMethods::FD_TIMEOUT, SendReceiveMethods::JIFFY

Instance Attribute Summary collapse

Attributes included from HasFFIDelegate

#ffi_delegate

Instance Method Summary collapse

Methods included from HasFFIDelegate::ClassMethods

ffi_delegate, from_ffi_delegate

Methods included from PolymorphicZsockMethods

#set_unbounded, #signal

Methods included from SendReceiveMethods

#read_timeout, #wait_for_fd_signal, #wait_writable, #write_timeout

Methods included from ZsockOptions

#fd, #options, #readable?, #to_io, #writable?

Methods included from HasFFIDelegate

#attach_ffi_delegate, #from_ffi_delegate, raise_zmq_err, #to_ptr

Constructor Details

#initialize(callback = nil, c_args = nil) {|message, pipe| ... } ⇒ Actor

Creates a new actor. Either pass a callback directly or a block. The callback/block will be called for every received message.

In case the given callback is an FFI::Pointer (to a C function), it’s used as-is. It is expected to do the handshake (signal) itself.

Parameters:

  • callback (FFI::Pointer, Proc, #call) (defaults to: nil)

    pointer to a C function or just anything callable

  • c_args (FFI::Pointer, nil) (defaults to: nil)

    args, only useful if callback is an FFI::Pointer

Yield Parameters:

See Also:

  • #process_messages


76
77
78
79
80
81
82
83
84
85
# File 'lib/cztop/actor.rb', line 76

def initialize(callback = nil, c_args = nil, &handler)
  @running         = true
  @mtx             = Mutex.new
  @callback        = callback || handler
  @callback        = shim(@callback) unless @callback.is_a? ::FFI::Pointer
  ffi_delegate     = Zactor.new(@callback, c_args)

  attach_ffi_delegate(ffi_delegate)
  options.sndtimeo = SEND_TIMEOUT # see #<<
end

Instance Attribute Details

#exceptionException (readonly)

Returns the exception that crashed this actor, if any.

Returns:

  • (Exception)

    the exception that crashed this actor, if any



60
61
62
# File 'lib/cztop/actor.rb', line 60

def exception
  @exception
end

Instance Method Details

#<<(message) ⇒ self

Note:

Normally this method is asynchronous, but if the message is TERMINATE, it blocks until the actor is terminated.

Send a message to the actor.

Parameters:

  • message (Object)

    message to send to the actor, see Message.coerce

Returns:

  • (self)

    so it’s chainable

Raises:



96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
# File 'lib/cztop/actor.rb', line 96

def <<(message)
  message = Message.coerce(message)

  if TERMINATE == message[0]
    # NOTE: can't just send this to the actor. The sender might call
    # #terminate immediately, which most likely causes a hang due to race
    # conditions.
    terminate
  else
    begin
      @mtx.synchronize do
        raise DeadActorError unless @running

        message.send_to(self)
      end
    rescue IO::EAGAINWaitWritable, IO::TimeoutError
      # The sndtimeo has been reached.
      #
      # This should fix the race condition (mainly on JRuby) between
      # @running not being set to false yet but the actor handler already
      # terminating and thus not able to receive messages anymore.
      #
      # This shouldn't result in an infinite loop, since it'll stop as
      # soon as @running is set to false by #signal_shimmed_handler_death,
      # at least when using a Ruby handler.
      #
      # In case of a C function handler, it MUST NOT crash and only
      # terminate when being sent the {TERMINATE} message using #terminate (so
      # #await_handler_death can set @running to false).
      retry
    end
  end

  self
end

#crashed?Boolean

Returns whether this actor has crashed.

Returns:

  • (Boolean)

    whether this actor has crashed

See Also:



218
219
220
# File 'lib/cztop/actor.rb', line 218

def crashed?
  !!@exception # if set, it has crashed
end

#dead?Boolean

Returns whether this actor is dead (terminated or crashed).

Returns:

  • (Boolean)

    whether this actor is dead (terminated or crashed)



211
212
213
# File 'lib/cztop/actor.rb', line 211

def dead?
  !@running
end

#receiveMessage

Receive a message from the actor.

Returns:

Raises:



136
137
138
139
140
141
142
# File 'lib/cztop/actor.rb', line 136

def receive
  @mtx.synchronize do
    raise DeadActorError unless @running

    super
  end
end

#request(message) ⇒ Message

Same as #<<, but also waits for a response from the actor and returns it.

Parameters:

  • message (Message)

    the request to the actor

Returns:

  • (Message)

    the actor’s response

Raises:



150
151
152
153
154
155
156
157
158
159
160
161
162
163
# File 'lib/cztop/actor.rb', line 150

def request(message)
  @mtx.synchronize do
    raise DeadActorError unless @running

    message = Message.coerce(message)
    raise ArgumentError, 'use #terminate' if TERMINATE == message[0]

    message.send_to(self)
    Message.receive_from(self)
  end
rescue IO::EAGAINWaitWritable
  # same as in #<<
  retry
end

#send_picture(picture, *args) ⇒ void

Note:

Mainly added for Beacon. If implemented there, it wouldn’t be thread safe. And it’s not that useful to be added to SendReceiveMethods.

This method returns an undefined value.

Sends a message according to a “picture”.

Parameters:

  • picture (String)

    message’s part types

  • args (String, Integer, ...)

    values, in FFI style (each one preceeded with it’s type, like :string, "foo")

See Also:

  • on http://api.zeromq.org/czmq3-0:zsock


175
176
177
178
179
180
181
# File 'lib/cztop/actor.rb', line 175

def send_picture(picture, *args)
  @mtx.synchronize do
    raise DeadActorError unless @running

    Zsock.send(ffi_delegate, picture, *args)
  end
end

#terminateBoolean

Tells the actor to terminate and waits for it. Idempotent.

Returns:

  • (Boolean)

    whether it died just now (false if it was dead already)



196
197
198
199
200
201
202
203
204
205
206
207
# File 'lib/cztop/actor.rb', line 196

def terminate
  @mtx.synchronize do
    return false unless @running

    Message.new(TERMINATE).send_to(self)
    await_handler_death
    true
  end
rescue IO::EAGAINWaitWritable, IO::TimeoutError
  # same as in #<<
  retry
end

#waitInteger

Returns:

  • (Integer)


186
187
188
189
190
# File 'lib/cztop/actor.rb', line 186

def wait
  @mtx.synchronize do
    super
  end
end