Module: CZTop::SendReceiveMethods
Overview
Constant Summary collapse
- FD_TIMEOUT =
Because ZMQ sockets are edge-triggered, there’s a small chance that we miss an edge (race condition). To avoid blocking forever, all waiting on the ZMQ FD is done with this timeout or less.
The race condition exists between the calls to #readable?/#writable? and waiting for the ZMQ FD. If the socke becomes readable/writable during that time, waiting for the FD could block forever without a timeout.
0.5
- JIFFY =
Sometimes the ZMQ FD just insists on readiness. To avoid hogging the CPU, a sleep of this many seconds is included in the tight loop.
0.015
Instance Method Summary collapse
-
#<<(message) ⇒ self
Sends a message.
- #now ⇒ Object private
-
#read_timeout ⇒ Float?
The timeout in seconds used by IO#wait_readable.
-
#receive ⇒ Message
Receives a message.
- #wait_for_fd_signal(timeout = nil) ⇒ Object
-
#wait_writable(timeout = write_timeout) ⇒ true
Waits for socket to become writable.
-
#write_timeout ⇒ Float?
The timeout in seconds used by IO#wait_writable.
Instance Method Details
#<<(message) ⇒ self
Sends a message.
31 32 33 34 |
# File 'lib/cztop/send_receive_methods.rb', line 31 def <<() Message.coerce().send_to(self) self end |
#now ⇒ Object (private)
167 168 169 |
# File 'lib/cztop/send_receive_methods.rb', line 167 def now Process.clock_gettime(Process::CLOCK_MONOTONIC) end |
#read_timeout ⇒ Float?
Returns the timeout in seconds used by IO#wait_readable.
137 138 139 140 141 142 143 144 145 146 147 |
# File 'lib/cztop/send_receive_methods.rb', line 137 def read_timeout timeout = .rcvtimeo if timeout <= 0 timeout = nil else timeout = timeout.to_f / 1000 end timeout end |
#receive ⇒ Message
Receives a message.
45 46 47 |
# File 'lib/cztop/send_receive_methods.rb', line 45 def receive Message.receive_from(self) end |
#wait_for_fd_signal(timeout = nil) ⇒ Object
Only available on Ruby >= 3.2
61 62 63 64 65 66 67 68 69 70 71 72 73 74 |
# File 'lib/cztop/send_receive_methods.rb', line 61 def wait_for_fd_signal(timeout = nil) @fd_io ||= to_io if timeout if timeout > FD_TIMEOUT timeout = FD_TIMEOUT end else timeout = FD_TIMEOUT end # NOTE: always wait for readability on ZMQ FD @fd_io.wait_readable timeout end |
#wait_writable(timeout = write_timeout) ⇒ true
Only available on Ruby >= 3.2
Waits for socket to become writable.
116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 |
# File 'lib/cztop/send_receive_methods.rb', line 116 def wait_writable(timeout = write_timeout) return true if writable? timeout_at = now + timeout if timeout while true # p wait_writable: self, timeout: timeout wait_for_fd_signal timeout break if writable? # NOTE: ZMQ FD can't be trusted raise ::IO::TimeoutError if timeout_at && now >= timeout_at sleep JIFFY # HACK break if writable? # NOTE: ZMQ FD is edge-triggered. Check again before blocking. end true end |
#write_timeout ⇒ Float?
Returns the timeout in seconds used by IO#wait_writable.
151 152 153 154 155 156 157 158 159 160 161 |
# File 'lib/cztop/send_receive_methods.rb', line 151 def write_timeout timeout = .sndtimeo if timeout <= 0 timeout = nil else timeout = timeout.to_f / 1000 end timeout end |