Class: ZMQ::Poll
- Inherits:
-
Object
- Object
- ZMQ::Poll
- Defined in:
- lib/0mq/poll.rb
Overview
A mechanism for applications to multiplex input/output events in a level-triggered fashion over a set of sockets.
Direct Known Subclasses
Instance Attribute Summary collapse
-
#timeout ⇒ Object
Timeout is specified in seconds.
Class Method Summary collapse
-
.poll(*sockets, &block) ⇒ Object
Construct a Poll object and start polling.
-
.poll_nonblock(*sockets, &block) ⇒ Object
Non-blocking version of poll.
Instance Method Summary collapse
-
#initialize(*sockets) ⇒ Poll
constructor
Accepts a list of sockets to poll for events (ZMQ::POLLIN, ZMQ::POLLOUT, ZMQ::POLLERR).
-
#run(&block) ⇒ Object
Start polling.
-
#run_nonblock(&block) ⇒ Object
Non-blocking version of run.
Constructor Details
#initialize(*sockets) ⇒ Poll
Accepts a list of sockets to poll for events (ZMQ::POLLIN, ZMQ::POLLOUT, ZMQ::POLLERR). Default is to poll for input (ZMQ::POLLIN). To poll for a different kind of event, specify the socket and event type as a key/value pair (my_socket => ZMQ::POLLOUT). Event flags can be binary OR’d together if necessary (my_socket => ZMQ::POLLIN | ZMQ::POLLOUT).
Timeout can be specified in seconds as a keyword arg. See the :timeout accessor.
Does not poll until #run is called.
Example: ZMQ::Poll.new socket1, socket2, socket3 => ZMQ::POLLOUT, timeout: 1
43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 |
# File 'lib/0mq/poll.rb', line 43 def initialize(*sockets) opts = sockets.last.is_a?(Hash) ? sockets.pop : {} # For Ruby 1.9 @timeout = opts.fetch :timeout, -1 @poll_items = [] @socks = {} sockets.each { |socket| @socks[socket] = ZMQ::POLLIN } # Pull remaining sockets out of options hash and package into poll items. # Skip any option symbols in the hash; they aren't sockets. # Rejecting symbols allows duck-typed sockets to be included. @socks.merge! opts.reject {|socket, events| socket.is_a? Symbol} # Build table to reference ZMQ::Socket to its pointer's address. # This is an easy way to reconnect PollItem to ZMQ::Socket without # having to store multiple dimensions in the socks hash. @socket_lookup = {} @socks.each { |socket, event| @socket_lookup[socket.to_ptr.address] = socket } # Allocate space for C PollItem (zmq_pollitem_t) structs. @poll_structs = FFI::MemoryPointer.new LibZMQ::PollItem, @socks.count, true # Create the PollItem objects. # Initializing them within the FFI::MemoryPointer prevents having to copy # the struct data to the MemoryPointer when polling, then back again to # retrieve the revents flags. i = 0 @socks.each do |socket, events| @poll_items.push LibZMQ::PollItem.new(@poll_structs[i]).tap { |pi| pi.socket = socket pi.events = events } i += 1 end end |
Instance Attribute Details
#timeout ⇒ Object
Timeout is specified in seconds. A value of 0 will return immediately (non-blocking), and a value of -1 will block indefinitely until an event has occurred. Fractions of a second are allowed. Timeout defaults to block indefinitely (-1).
12 13 14 |
# File 'lib/0mq/poll.rb', line 12 def timeout @timeout end |
Class Method Details
.poll(*sockets, &block) ⇒ Object
Construct a Poll object and start polling. See #initialize for parameters. See #run for block and return value.
17 18 19 20 |
# File 'lib/0mq/poll.rb', line 17 def self.poll(*sockets, &block) poller = new *sockets poller.run &block end |
.poll_nonblock(*sockets, &block) ⇒ Object
Non-blocking version of poll.
23 24 25 |
# File 'lib/0mq/poll.rb', line 23 def self.poll_nonblock(*sockets, &block) self.poll *sockets, timeout: 0, &block end |
Instance Method Details
#run(&block) ⇒ Object
Start polling.
Returns a hash of ZMQ::Socket => revents (triggered event flags). Each item of the hash is passed to the block, if provided.
86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 |
# File 'lib/0mq/poll.rb', line 86 def run(&block) return {} if @poll_items.empty? # Convert seconds to miliseconds. timeout = @timeout > 0 ? (@timeout * 1000).to_i : @timeout # Poll rc = LibZMQ::zmq_poll @poll_structs, @poll_items.count, timeout ZMQ.error_check true if rc == -1 # Create a hash of the items with triggered events. # (ZMQ::Socket => revents) triggered_items = @poll_items.select { |pi| pi.revents > 0 } .map { |pi| [@socket_lookup[pi.socket.address], pi.revents] } triggered_items = Hash[triggered_items] # Pass triggered sockets to block. triggered_items.each { |socket, revents| block.call socket, revents } if block triggered_items end |
#run_nonblock(&block) ⇒ Object
Non-blocking version of run.
110 111 112 113 |
# File 'lib/0mq/poll.rb', line 110 def run_nonblock(&block) @timeout = 0 run &block end |