Class: ZMQ::Poller

Inherits:
Object
  • Object
show all
Includes:
Util
Defined in:
lib/ffi-rzmq/poll.rb

Instance Attribute Summary collapse

Instance Method Summary collapse

Methods included from Util

bind_to_random_tcp_port, errno, error_string, nonblocking_flag, resultcode_ok?, version

Constructor Details

#initializePoller

Returns a new instance of Poller.



9
10
11
12
13
14
15
# File 'lib/ffi-rzmq/poll.rb', line 9

def initialize
  @items = ZMQ::PollItems.new
  @raw_to_socket = {}
  @sockets = []
  @readables = []
  @writables = []
end

Instance Attribute Details

#readablesObject (readonly)

Returns the value of attribute readables.



7
8
9
# File 'lib/ffi-rzmq/poll.rb', line 7

def readables
  @readables
end

#writablesObject (readonly)

Returns the value of attribute writables.



7
8
9
# File 'lib/ffi-rzmq/poll.rb', line 7

def writables
  @writables
end

Instance Method Details

#delete(sock) ⇒ Object

Deletes the sock for all subscribed events. Called internally when a socket has been deregistered and has no more events registered anywhere.



142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
# File 'lib/ffi-rzmq/poll.rb', line 142

def delete sock
  removed_readable = deregister_readable sock
  removed_writable = deregister_writable sock

  unless (removed_readable || removed_writable) || (size = @sockets.size).zero?
    @sockets.delete_if { |socket| socket.socket.address == sock.socket.address }
    socket_deleted = size != @sockets.size

    item_deleted = @items.delete sock

    raw_deleted = @raw_to_socket.delete(sock.socket.address)

    socket_deleted && item_deleted && raw_deleted
    
  else
    # return result of deregistration
    removed_readable || removed_writable
  end
end

#deregister(sock, events, fd = 0) ⇒ Object

Deregister the sock for events. When there are no events left, this also deletes the socket from the poll items.



98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
# File 'lib/ffi-rzmq/poll.rb', line 98

def deregister sock, events, fd = 0
  return unless sock || !fd.zero?

  item = @items.get(@sockets.index(sock))

  if item
    # change the value in place
    item[:events] ^= events

    delete sock if item[:events].zero?
    true
  else
    false
  end
end

#deregister_readable(sock) ⇒ Object

A helper method to deregister a sock for readable events.



128
129
130
# File 'lib/ffi-rzmq/poll.rb', line 128

def deregister_readable sock
  deregister sock, ZMQ::POLLIN, 0
end

#deregister_writable(sock) ⇒ Object

A helper method to deregister a sock for writable events.



134
135
136
# File 'lib/ffi-rzmq/poll.rb', line 134

def deregister_writable sock
  deregister sock, ZMQ::POLLOUT, 0
end

#inspectObject



164
165
166
# File 'lib/ffi-rzmq/poll.rb', line 164

def inspect
  @items.inspect
end

#poll(timeout = :blocking) ⇒ Object

Checks each registered socket for selectability based on the poll items’ registered events. Will block for up to timeout milliseconds A millisecond is 1/1000 of a second, so to block for 1 second pass the value “1000” to #poll.

Pass “-1” or :blocking for timeout for this call to block indefinitely.

This method will return immediately when there are no registered sockets. In that case, the timeout parameter is not honored. To prevent a CPU busy-loop, the caller of this method should detect this possible condition (via #size) and throttle the call frequency.

Returns 0 when there are no registered sockets that are readable or writable.

Return 1 (or greater) to indicate the number of readable or writable sockets. These sockets should be processed using the #readables and #writables accessors.

Returns -1 when there is an error. Use ZMQ::Util.errno to get the related error number.



41
42
43
44
45
46
47
48
49
50
51
52
53
54
# File 'lib/ffi-rzmq/poll.rb', line 41

def poll timeout = :blocking
  unless @items.empty?
    timeout = adjust timeout
    items_triggered = LibZMQ.zmq_poll @items.address, @items.size, timeout
    
    if Util.resultcode_ok?(items_triggered)
      update_selectables
    end
    
    items_triggered
  else
    0
  end
end

#poll_nonblockObject

The non-blocking version of #poll. See the #poll description for potential exceptions.

May return -1 when an error is encounted. Check ZMQ::Util.errno to determine the underlying cause.



62
63
64
# File 'lib/ffi-rzmq/poll.rb', line 62

def poll_nonblock
  poll 0
end

#register(sock, events = ZMQ::POLLIN | ZMQ::POLLOUT, fd = 0) ⇒ Object

Register the sock for events. This method is idempotent meaning it can be called multiple times with the same data and the socket will only get registered at most once. Calling multiple times with different values for events will OR the event information together.



71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
# File 'lib/ffi-rzmq/poll.rb', line 71

def register sock, events = ZMQ::POLLIN | ZMQ::POLLOUT, fd = 0
  return false if (sock.nil? && fd.zero?) || events.zero?

  item = @items.get(@sockets.index(sock))

  unless item
    @sockets << sock
    item = LibZMQ::PollItem.new

    if sock.kind_of?(ZMQ::Socket) || sock.kind_of?(Socket)
      item[:socket] = sock.socket
      item[:fd] = 0
    else
      item[:socket] = FFI::MemoryPointer.new(0)
      item[:fd] = fd
    end

    @raw_to_socket[item.socket.address] = sock
    @items << item
  end

  item[:events] |= events
end

#register_readable(sock) ⇒ Object

A helper method to register a sock as readable events only.



116
117
118
# File 'lib/ffi-rzmq/poll.rb', line 116

def register_readable sock
  register sock, ZMQ::POLLIN, 0
end

#register_writable(sock) ⇒ Object

A helper method to register a sock for writable events only.



122
123
124
# File 'lib/ffi-rzmq/poll.rb', line 122

def register_writable sock
  register sock, ZMQ::POLLOUT, 0
end

#sizeObject



162
# File 'lib/ffi-rzmq/poll.rb', line 162

def size(); @items.size; end

#to_sObject



168
# File 'lib/ffi-rzmq/poll.rb', line 168

def to_s(); inspect; end