Class: CZTop::Poller

Inherits:
Object
  • Object
show all
Includes:
CZMQ::FFI
Defined in:
lib/cztop/poller.rb

Overview

A non-trivial socket poller.

It can poll for readability and writability, and supports thread-safe sockets (SERVER/CLIENT/RADIO/DISH).

This implementation is NOT based on CZMQ’s zpoller. Reasons:

  • zpoller can only poll for readability

Defined Under Namespace

Modules: ZMQ Classes: Aggregated, Event, ZPoller

Instance Method Summary collapse

Constructor Details

#initialize(*readers) ⇒ Poller

Returns a new instance of Poller.

Parameters:

  • readers (Socket, Actor)

    sockets to poll for input



18
19
20
21
22
23
24
25
26
27
28
29
30
# File 'lib/cztop/poller.rb', line 18

def initialize(*readers)
  @sockets    = {} # needed to return the same socket objects
  @events     = {} # event masks for each socket
  @poller_ptr = ZMQ.poller_new
  ObjectSpace.define_finalizer(@poller_ptr,
                               proc do
                                 ptr_ptr = ::FFI::MemoryPointer.new :pointer
                                 ptr_ptr.write_pointer(@poller_ptr)
                                 ZMQ.poller_destroy(ptr_ptr)
                               end)
  @event_ptr = FFI::MemoryPointer.new(ZMQ::PollerEvent)
  readers.each { |r| add_reader(r) }
end

Instance Method Details

#add(socket, events) ⇒ void

This method returns an undefined value.

Adds a socket to be polled for readability.

Parameters:

  • socket (Socket, Actor)

    the socket

  • events (Integer)

    bitwise-OR’d events you’re interested in (see POLLIN and POLLOUT constants in ZMQ

Raises:

  • (ArgumentError)

    if it’s not a socket



39
40
41
42
43
44
# File 'lib/cztop/poller.rb', line 39

def add(socket, events)
  ptr = ptr_for_socket(socket)
  rc  = ZMQ.poller_add(@poller_ptr, ptr, nil, events)
  HasFFIDelegate.raise_zmq_err if rc == -1
  remember_socket(socket, events)
end

#add_reader(socket) ⇒ void

This method returns an undefined value.

Convenience method to register a socket for readability. See #add.

Parameters:



50
51
52
# File 'lib/cztop/poller.rb', line 50

def add_reader(socket)
  add(socket, ZMQ::POLLIN)
end

#add_writer(socket) ⇒ void

This method returns an undefined value.

Convenience method to register a socket for writability. See #add.

Parameters:



58
59
60
# File 'lib/cztop/poller.rb', line 58

def add_writer(socket)
  add(socket, ZMQ::POLLOUT)
end

#event_mask_for_socket(socket) ⇒ Integer

Returns the event mask for the given, registered socket.

Parameters:

  • socket (Socket, Actor)

    which socket’s events to return

Returns:

  • (Integer)

    event mask for the given socket

Raises:

  • (ArgumentError)

    if socket is not registered



176
177
178
179
# File 'lib/cztop/poller.rb', line 176

def event_mask_for_socket(socket)
  @events[socket] or
    raise ArgumentError, format('no event mask known for socket %p', socket)
end

#forget_socket(socket) ⇒ Object (private)

Discards the referencel to the given socket, and forgets its event mask.

Parameters:



204
205
206
207
# File 'lib/cztop/poller.rb', line 204

def forget_socket(socket)
  @sockets.delete(ptr_for_socket(socket).to_i)
  @events.delete(socket)
end

#modify(socket, events) ⇒ void

This method returns an undefined value.

Modifies the events of interest for the given socket.

Parameters:

  • socket (Socket, Actor)

    the socket

  • events (Integer)

    events you’re interested in (see constants in ZMQ

Raises:

  • (ArgumentError)

    if it’s not a socket



69
70
71
72
73
74
# File 'lib/cztop/poller.rb', line 69

def modify(socket, events)
  ptr = ptr_for_socket(socket)
  rc  = ZMQ.poller_modify(@poller_ptr, ptr, events)
  HasFFIDelegate.raise_zmq_err if rc == -1
  remember_socket(socket, events)
end

#ptr_for_socket(socket) ⇒ FFI::Pointer (private)

Returns low-level handle.

Parameters:

Returns:

  • (FFI::Pointer)

    low-level handle

Raises:

  • (ArgumentError)

    if argument is not a socket



186
187
188
189
190
# File 'lib/cztop/poller.rb', line 186

def ptr_for_socket(socket)
  raise ArgumentError unless socket.is_a?(Socket) || socket.is_a?(Actor)

  Zsock.resolve(socket)
end

#remember_socket(socket, events) ⇒ Object (private)

Keeps a reference to the given socket, and remembers its event mask.

Parameters:

  • socket (Socket, Actor)

    the socket

  • events (Integer)

    the event mask



196
197
198
199
# File 'lib/cztop/poller.rb', line 196

def remember_socket(socket, events)
  @sockets[ptr_for_socket(socket).to_i] = socket
  @events[socket]                       = events
end

#remove(socket) ⇒ void

This method returns an undefined value.

Removes a previously registered socket. Won’t raise if you’re trying to remove a socket that’s not registered.

Parameters:

Raises:

  • (ArgumentError)

    if it’s not a socket



82
83
84
85
86
87
# File 'lib/cztop/poller.rb', line 82

def remove(socket)
  ptr = ptr_for_socket(socket)
  rc  = ZMQ.poller_remove(@poller_ptr, ptr)
  HasFFIDelegate.raise_zmq_err if rc == -1
  forget_socket(socket)
end

#remove_reader(socket) ⇒ Object

Removes a reader socket that was registered for readability only.

Parameters:

Raises:

  • (ArgumentError)

    if it’s not registered, not registered for readability, or registered for more than just readability



95
96
97
98
99
100
101
# File 'lib/cztop/poller.rb', line 95

def remove_reader(socket)
  if event_mask_for_socket(socket) == ZMQ::POLLIN
    remove(socket)
    return
  end
  raise ArgumentError, format('not registered for readability only: %p', socket)
end

#remove_writer(socket) ⇒ Object

Removes a reader socket that was registered for writability only.

Parameters:

Raises:

  • (ArgumentError)

    if it’s not registered, not registered for writability, or registered for more than just writability



109
110
111
112
113
114
115
# File 'lib/cztop/poller.rb', line 109

def remove_writer(socket)
  if event_mask_for_socket(socket) == ZMQ::POLLOUT
    remove(socket)
    return
  end
  raise ArgumentError, format('not registered for writability only: %p', socket)
end

#simple_wait(timeout = -1)) ⇒ Socket, ...

Simpler version of #wait, which just returns the first socket of interest, if any. This is useful if you either have only reader sockets, or only have writer sockets.

Parameters:

  • timeout (Integer) (defaults to: -1))

    how long to wait in ms, or 0 to avoid blocking, or -1 to wait indefinitely

Returns:

  • (Socket, Actor)

    first socket of interest

  • (nil)

    if timeout expired

Raises:

  • (SystemCallError)

    if this failed



150
151
152
153
# File 'lib/cztop/poller.rb', line 150

def simple_wait(timeout = -1)
  event = wait(timeout)
  return event.socket if event
end

#socket_for_ptr(ptr) ⇒ Socket, Actor

Returns socket corresponding to given pointer.

Parameters:

  • ptr (FFI::Pointer)

    pointer to the socket

Returns:

  • (Socket, Actor)

    socket corresponding to given pointer

Raises:

  • (ArgumentError)

    if pointer is not known



159
160
161
162
# File 'lib/cztop/poller.rb', line 159

def socket_for_ptr(ptr)
  @sockets[ptr.to_i] or
    raise ArgumentError, format('no socket known for pointer %p', ptr)
end

#socketsArray<CZTop::Socket>

Note:

The actual events registered for each sockets don’t matter.

Returns all sockets registered with this poller.

Returns:

  • (Array<CZTop::Socket>)

    all sockets registered with this poller



167
168
169
# File 'lib/cztop/poller.rb', line 167

def sockets
  @sockets.values
end

#wait(timeout = -1)) ⇒ Event?

Waits for registered sockets to become readable or writable, depending on what you’re interested in.

Parameters:

  • timeout (Integer) (defaults to: -1))

    how long to wait in ms, or 0 to avoid blocking, or -1 to wait indefinitely

Returns:

  • (Event)

    the first event of interest

  • (nil)

    if the timeout expired or

Raises:

  • (SystemCallError)

    if this failed



126
127
128
129
130
131
132
133
134
135
136
137
138
139
# File 'lib/cztop/poller.rb', line 126

def wait(timeout = -1)
  rc = ZMQ.poller_wait(@poller_ptr, @event_ptr, timeout)
  if rc == -1
    case CZMQ::FFI::Errors.errno
      # NOTE: ETIMEDOUT for backwards compatibility, although this API is
      # still DRAFT.
    when Errno::EAGAIN::Errno, Errno::ETIMEDOUT::Errno
      return nil
    else
      HasFFIDelegate.raise_zmq_err
    end
  end
  Event.new(self, @event_ptr)
end