Module: CZTop::SendReceiveMethods

Included in:
Actor, Socket
Defined in:
lib/cztop/send_receive_methods.rb

Overview

These are methods that can be used on a Socket as well as an Actor, but actually just pass through to methods of Message (which take a polymorphic reference, in Ruby as well as in C).

Constant Summary collapse

FD_TIMEOUT =

Because ZMQ sockets are edge-triggered, there’s a small chance that we miss an edge (race condition). To avoid blocking forever, all waiting on the ZMQ FD is done with this timeout or less.

The race condition exists between the calls to #readable?/#writable? and waiting for the ZMQ FD. If the socke becomes readable/writable during that time, waiting for the FD could block forever without a timeout.

0.5
JIFFY =

Sometimes the ZMQ FD just insists on readiness. To avoid hogging the CPU, a sleep of this many seconds is included in the tight loop.

0.015

Instance Method Summary collapse

Instance Method Details

#<<(message) ⇒ self

Sends a message.

Parameters:

  • message (Message, String, Array<parts>)

    the message to send

Returns:

  • (self)

Raises:

See Also:



31
32
33
34
# File 'lib/cztop/send_receive_methods.rb', line 31

def <<(message)
  Message.coerce(message).send_to(self)
  self
end

#nowObject (private)



167
168
169
# File 'lib/cztop/send_receive_methods.rb', line 167

def now
  Process.clock_gettime(Process::CLOCK_MONOTONIC)
end

#read_timeoutFloat?

Returns the timeout in seconds used by IO#wait_readable.

Returns:

  • (Float, nil)

    the timeout in seconds used by IO#wait_readable



137
138
139
140
141
142
143
144
145
146
147
# File 'lib/cztop/send_receive_methods.rb', line 137

def read_timeout
  timeout = options.rcvtimeo

  if timeout <= 0
    timeout = nil
  else
    timeout = timeout.to_f / 1000
  end

  timeout
end

#receiveMessage

Receives a message.

Returns:

Raises:

See Also:



45
46
47
# File 'lib/cztop/send_receive_methods.rb', line 45

def receive
  Message.receive_from(self)
end

#wait_for_fd_signal(timeout = nil) ⇒ Object

Note:

Only available on Ruby >= 3.2



61
62
63
64
65
66
67
68
69
70
71
72
73
74
# File 'lib/cztop/send_receive_methods.rb', line 61

def wait_for_fd_signal(timeout = nil)
  @fd_io ||= to_io

  if timeout
    if timeout > FD_TIMEOUT
      timeout = FD_TIMEOUT
    end
  else
    timeout = FD_TIMEOUT
  end

  # NOTE: always wait for readability on ZMQ FD
  @fd_io.wait_readable timeout
end

#wait_writable(timeout = write_timeout) ⇒ true

Note:

Only available on Ruby >= 3.2

Waits for socket to become writable.

Parameters:

  • timeout (Numeric, nil) (defaults to: write_timeout)

    timeout in seconds

Returns:

  • (true)

    if writable within timeout

Raises:

  • (IO::EAGAINWaitReadable, IO::TimeoutError)

    if timeout has been reached

  • (CZMQ::FFI::Zsock::DestroyedError)

    if socket has already been destroyed



116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
# File 'lib/cztop/send_receive_methods.rb', line 116

def wait_writable(timeout = write_timeout)
  return true if writable?

  timeout_at = now + timeout if timeout

  while true
    # p wait_writable: self, timeout: timeout

    wait_for_fd_signal timeout
    break if writable? # NOTE: ZMQ FD can't be trusted
    raise ::IO::TimeoutError if timeout_at && now >= timeout_at

    sleep JIFFY # HACK
    break if writable? # NOTE: ZMQ FD is edge-triggered. Check again before blocking.
  end

  true
end

#write_timeoutFloat?

Returns the timeout in seconds used by IO#wait_writable.

Returns:

  • (Float, nil)

    the timeout in seconds used by IO#wait_writable



151
152
153
154
155
156
157
158
159
160
161
# File 'lib/cztop/send_receive_methods.rb', line 151

def write_timeout
  timeout = options.sndtimeo

  if timeout <= 0
    timeout = nil
  else
    timeout = timeout.to_f / 1000
  end

  timeout
end