Class: CZTop::Poller
Overview
A non-trivial socket poller.
It can poll for readability and writability, and supports thread-safe sockets (SERVER/CLIENT/RADIO/DISH).
This implementation is NOT based on CZMQ’s zpoller
. Reasons:
-
zpoller
can only poll for readability
Defined Under Namespace
Modules: ZMQ Classes: Aggregated, Event, ZPoller
Instance Method Summary collapse
-
#add(socket, events) ⇒ void
Adds a socket to be polled for readability.
-
#add_reader(socket) ⇒ void
Convenience method to register a socket for readability.
-
#add_writer(socket) ⇒ void
Convenience method to register a socket for writability.
-
#event_mask_for_socket(socket) ⇒ Integer
Returns the event mask for the given, registered socket.
-
#forget_socket(socket) ⇒ Object
private
Discards the referencel to the given socket, and forgets its event mask.
-
#initialize(*readers) ⇒ Poller
constructor
A new instance of Poller.
-
#modify(socket, events) ⇒ void
Modifies the events of interest for the given socket.
-
#ptr_for_socket(socket) ⇒ FFI::Pointer
private
Low-level handle.
-
#remember_socket(socket, events) ⇒ Object
private
Keeps a reference to the given socket, and remembers its event mask.
-
#remove(socket) ⇒ void
Removes a previously registered socket.
-
#remove_reader(socket) ⇒ Object
Removes a reader socket that was registered for readability only.
-
#remove_writer(socket) ⇒ Object
Removes a reader socket that was registered for writability only.
-
#simple_wait(timeout = -1)) ⇒ Socket, ...
Simpler version of #wait, which just returns the first socket of interest, if any.
-
#socket_for_ptr(ptr) ⇒ Socket, Actor
Socket corresponding to given pointer.
-
#sockets ⇒ Array<CZTop::Socket>
All sockets registered with this poller.
-
#wait(timeout = -1)) ⇒ Event?
Waits for registered sockets to become readable or writable, depending on what you’re interested in.
Constructor Details
#initialize(*readers) ⇒ Poller
Returns a new instance of Poller.
18 19 20 21 22 23 24 25 26 27 28 29 30 |
# File 'lib/cztop/poller.rb', line 18 def initialize(*readers) @sockets = {} # needed to return the same socket objects @events = {} # event masks for each socket @poller_ptr = ZMQ.poller_new ObjectSpace.define_finalizer(@poller_ptr, proc do ptr_ptr = ::FFI::MemoryPointer.new :pointer ptr_ptr.write_pointer(@poller_ptr) ZMQ.poller_destroy(ptr_ptr) end) @event_ptr = FFI::MemoryPointer.new(ZMQ::PollerEvent) readers.each { |r| add_reader(r) } end |
Instance Method Details
#add(socket, events) ⇒ void
This method returns an undefined value.
Adds a socket to be polled for readability.
39 40 41 42 43 44 |
# File 'lib/cztop/poller.rb', line 39 def add(socket, events) ptr = ptr_for_socket(socket) rc = ZMQ.poller_add(@poller_ptr, ptr, nil, events) HasFFIDelegate.raise_zmq_err if rc == -1 remember_socket(socket, events) end |
#add_reader(socket) ⇒ void
This method returns an undefined value.
Convenience method to register a socket for readability. See #add.
50 51 52 |
# File 'lib/cztop/poller.rb', line 50 def add_reader(socket) add(socket, ZMQ::POLLIN) end |
#add_writer(socket) ⇒ void
This method returns an undefined value.
Convenience method to register a socket for writability. See #add.
58 59 60 |
# File 'lib/cztop/poller.rb', line 58 def add_writer(socket) add(socket, ZMQ::POLLOUT) end |
#event_mask_for_socket(socket) ⇒ Integer
Returns the event mask for the given, registered socket.
176 177 178 179 |
# File 'lib/cztop/poller.rb', line 176 def event_mask_for_socket(socket) @events[socket] or raise ArgumentError, format('no event mask known for socket %p', socket) end |
#forget_socket(socket) ⇒ Object (private)
Discards the referencel to the given socket, and forgets its event mask.
204 205 206 207 |
# File 'lib/cztop/poller.rb', line 204 def forget_socket(socket) @sockets.delete(ptr_for_socket(socket).to_i) @events.delete(socket) end |
#modify(socket, events) ⇒ void
This method returns an undefined value.
Modifies the events of interest for the given socket.
69 70 71 72 73 74 |
# File 'lib/cztop/poller.rb', line 69 def modify(socket, events) ptr = ptr_for_socket(socket) rc = ZMQ.poller_modify(@poller_ptr, ptr, events) HasFFIDelegate.raise_zmq_err if rc == -1 remember_socket(socket, events) end |
#ptr_for_socket(socket) ⇒ FFI::Pointer (private)
Returns low-level handle.
186 187 188 189 190 |
# File 'lib/cztop/poller.rb', line 186 def ptr_for_socket(socket) raise ArgumentError unless socket.is_a?(Socket) || socket.is_a?(Actor) Zsock.resolve(socket) end |
#remember_socket(socket, events) ⇒ Object (private)
Keeps a reference to the given socket, and remembers its event mask.
196 197 198 199 |
# File 'lib/cztop/poller.rb', line 196 def remember_socket(socket, events) @sockets[ptr_for_socket(socket).to_i] = socket @events[socket] = events end |
#remove(socket) ⇒ void
This method returns an undefined value.
Removes a previously registered socket. Won’t raise if you’re trying to remove a socket that’s not registered.
82 83 84 85 86 87 |
# File 'lib/cztop/poller.rb', line 82 def remove(socket) ptr = ptr_for_socket(socket) rc = ZMQ.poller_remove(@poller_ptr, ptr) HasFFIDelegate.raise_zmq_err if rc == -1 forget_socket(socket) end |
#remove_reader(socket) ⇒ Object
Removes a reader socket that was registered for readability only.
95 96 97 98 99 100 101 |
# File 'lib/cztop/poller.rb', line 95 def remove_reader(socket) if event_mask_for_socket(socket) == ZMQ::POLLIN remove(socket) return end raise ArgumentError, format('not registered for readability only: %p', socket) end |
#remove_writer(socket) ⇒ Object
Removes a reader socket that was registered for writability only.
109 110 111 112 113 114 115 |
# File 'lib/cztop/poller.rb', line 109 def remove_writer(socket) if event_mask_for_socket(socket) == ZMQ::POLLOUT remove(socket) return end raise ArgumentError, format('not registered for writability only: %p', socket) end |
#simple_wait(timeout = -1)) ⇒ Socket, ...
Simpler version of #wait, which just returns the first socket of interest, if any. This is useful if you either have only reader sockets, or only have writer sockets.
150 151 152 153 |
# File 'lib/cztop/poller.rb', line 150 def simple_wait(timeout = -1) event = wait(timeout) return event.socket if event end |
#socket_for_ptr(ptr) ⇒ Socket, Actor
Returns socket corresponding to given pointer.
159 160 161 162 |
# File 'lib/cztop/poller.rb', line 159 def socket_for_ptr(ptr) @sockets[ptr.to_i] or raise ArgumentError, format('no socket known for pointer %p', ptr) end |
#sockets ⇒ Array<CZTop::Socket>
The actual events registered for each sockets don’t matter.
Returns all sockets registered with this poller.
167 168 169 |
# File 'lib/cztop/poller.rb', line 167 def sockets @sockets.values end |
#wait(timeout = -1)) ⇒ Event?
Waits for registered sockets to become readable or writable, depending on what you’re interested in.
126 127 128 129 130 131 132 133 134 135 136 137 138 139 |
# File 'lib/cztop/poller.rb', line 126 def wait(timeout = -1) rc = ZMQ.poller_wait(@poller_ptr, @event_ptr, timeout) if rc == -1 case CZMQ::FFI::Errors.errno # NOTE: ETIMEDOUT for backwards compatibility, although this API is # still DRAFT. when Errno::EAGAIN::Errno, Errno::ETIMEDOUT::Errno return nil else HasFFIDelegate.raise_zmq_err end end Event.new(self, @event_ptr) end |