Class: CZTop::Poller::ZPoller

Inherits:
Object
  • Object
show all
Extended by:
HasFFIDelegate::ClassMethods
Includes:
CZMQ::FFI, HasFFIDelegate
Defined in:
lib/cztop/poller/zpoller.rb

Overview

This is the trivial poller based on zpoller. It only supports polling for readability, but it also supports doing that on CLIENT/SERVER sockets, which is useful for CZTop::Poller.

Instance Attribute Summary

Attributes included from HasFFIDelegate

#ffi_delegate

Instance Method Summary collapse

Methods included from HasFFIDelegate::ClassMethods

ffi_delegate, from_ffi_delegate

Methods included from HasFFIDelegate

#attach_ffi_delegate, #from_ffi_delegate, raise_zmq_err, #raise_zmq_err, #to_ptr

Constructor Details

#initialize(reader, *readers) ⇒ ZPoller

Initializes the Poller. At least one reader has to be given.

Parameters:

  • reader (Socket, Actor)

    socket to poll for input

  • readers (Socket, Actor)

    any additional sockets to poll for input



18
19
20
21
22
23
24
25
26
# File 'lib/cztop/poller/zpoller.rb', line 18

def initialize(reader, *readers)
  @sockets = {} # to keep references and return same instances
  ptr      = Zpoller.new(reader,
                         *readers.flat_map { |r| [:pointer, r] },
                         :pointer, nil)
  attach_ffi_delegate(ptr)
  remember_socket(reader)
  readers.each { |r| remember_socket(r) }
end

Instance Method Details

#add(reader) ⇒ void

This method returns an undefined value.

Adds another reader socket to the poller.

Parameters:

Raises:

  • (SystemCallError)

    if this fails



33
34
35
36
37
# File 'lib/cztop/poller/zpoller.rb', line 33

def add(reader)
  rc = ffi_delegate.add(reader)
  raise_zmq_err(format('unable to add socket %p', reader)) if rc == -1
  remember_socket(reader)
end

#forget_socket(socket) ⇒ void (private)

This method returns an undefined value.

Forgets the socket because it has been removed from the poller.

Parameters:

  • socket (Socket, Actor)

    the socket instance to forget



107
108
109
# File 'lib/cztop/poller/zpoller.rb', line 107

def forget_socket(socket)
  @sockets.delete(socket.to_ptr.to_i)
end

#ignore_interruptsvoid

This method returns an undefined value.

Tells the zpoller to ignore interrupts. By default, #wait will return immediately if it detects an interrupt (when zsys_interrupted is set to something other than zero). Calling this method will supress this behavior.



75
76
77
# File 'lib/cztop/poller/zpoller.rb', line 75

def ignore_interrupts
  ffi_delegate.ignore_interrupts
end

#nonstop=(flag) ⇒ Object

By default the poller stops if the process receives a SIGINT or SIGTERM signal. This makes it impossible to shut-down message based architectures like zactors. This method lets you switch off break handling. The default nonstop setting is off (false).

Setting this will cause #wait to never raise.

Parameters:

  • flag (Boolean)

    whether the poller should run nonstop



88
89
90
# File 'lib/cztop/poller/zpoller.rb', line 88

def nonstop=(flag)
  ffi_delegate.set_nonstop(flag)
end

#remember_socket(socket) ⇒ void (private)

This method returns an undefined value.

Remembers the socket so a call to #wait can return with the exact same instance of Socket, and it also makes sure the socket won’t get GC’d.

Parameters:

  • socket (Socket, Actor)

    the socket instance to remember



99
100
101
# File 'lib/cztop/poller/zpoller.rb', line 99

def remember_socket(socket)
  @sockets[socket.to_ptr.to_i] = socket
end

#remove(reader) ⇒ void

This method returns an undefined value.

Removes a reader socket from the poller.

Parameters:

Raises:

  • (ArgumentError)

    if socket was invalid, e.g. it wasn’t registered in this poller

  • (SystemCallError)

    if this fails for another reason



46
47
48
49
50
# File 'lib/cztop/poller/zpoller.rb', line 46

def remove(reader)
  rc = ffi_delegate.remove(reader)
  raise_zmq_err(format('unable to remove socket %p', reader)) if rc == -1
  forget_socket(reader)
end

#socket_by_ptr(ptr) ⇒ Socket, Actor (private)

Gets the previously remembered socket associated to the given pointer.

Parameters:

  • ptr (FFI::Pointer)

    the pointer to a socket

Returns:

  • (Socket, Actor)

    the socket associated to the given pointer

Raises:

  • (SystemCallError)

    if no socket is registered under given pointer



116
117
118
119
120
121
122
# File 'lib/cztop/poller/zpoller.rb', line 116

def socket_by_ptr(ptr)
  @sockets[ptr.to_i] or
    # NOTE: This should never happen, since #wait will return nil if
    # +zpoller_wait+ returned NULL. But it's better to fail early in case
    # it ever returns a wrong pointer.
    raise_zmq_err("no socket known for pointer #{ptr.inspect}")
end

#wait(timeout = -1)) ⇒ Socket, ...

Waits and returns the first socket that becomes readable.

Parameters:

  • timeout (Integer) (defaults to: -1))

    how long to wait in ms, or 0 to avoid blocking, or -1 to wait indefinitely

Returns:

  • (Socket, Actor)

    first socket of interest

  • (nil)

    if the timeout expired or

Raises:

  • (Interrupt)

    if the timeout expired or



59
60
61
62
63
64
65
66
67
# File 'lib/cztop/poller/zpoller.rb', line 59

def wait(timeout = -1)
  ptr = ffi_delegate.wait(timeout)
  if ptr.null?
    raise Interrupt if ffi_delegate.terminated

    return nil
  end
  socket_by_ptr(ptr)
end