Module: CZTop::SendReceiveMethods

Included in:
Actor, Socket
Defined in:
lib/cztop/send_receive_methods.rb

Overview

These are methods that can be used on a Socket as well as an Actor, but actually just pass through to methods of Message (which take a polymorphic reference, in Ruby as well as in C).

Constant Summary collapse

JIFFY =

15 ms

0.015

Instance Method Summary collapse

Instance Method Details

#<<(message) ⇒ self

Sends a message.

Parameters:

  • message (Message, String, Array<parts>)

    the message to send

Returns:

  • (self)

Raises:

See Also:



26
27
28
29
# File 'lib/cztop/send_receive_methods.rb', line 26

def <<(message)
  Message.coerce(message).send_to(self)
  self
end

#nowObject (private)



151
152
153
# File 'lib/cztop/send_receive_methods.rb', line 151

def now
  Process.clock_gettime(Process::CLOCK_MONOTONIC)
end

#read_timeoutFloat?

Returns the timeout in seconds used by IO#wait_readable.

Returns:

  • (Float, nil)

    the timeout in seconds used by IO#wait_readable



121
122
123
124
125
126
127
128
129
130
131
# File 'lib/cztop/send_receive_methods.rb', line 121

def read_timeout
  timeout = options.rcvtimeo

  if timeout <= 0
    timeout = nil
  else
    timeout = timeout.to_f / 1000
  end

  timeout
end

#receiveMessage

Receives a message.

Returns:

Raises:

See Also:



40
41
42
# File 'lib/cztop/send_receive_methods.rb', line 40

def receive
  Message.receive_from(self)
end

#wait_readable(timeout = read_timeout) ⇒ true

Waits for socket to become readable.

Parameters:

  • timeout (Numeric, nil) (defaults to: read_timeout)

    timeout in seconds

Returns:

  • (true)

    if readable within timeout

Raises:



52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
# File 'lib/cztop/send_receive_methods.rb', line 52

def wait_readable(timeout = read_timeout)
  return true if readable?

  @fd_io ||= to_io

  if timeout
    timeout_at = now + timeout

    while true
      # p wait_readable: self, timeout: timeout
      @fd_io.wait_readable(timeout)
      break if readable? # NOTE: ZMQ FD can't be trusted
      raise ::IO::TimeoutError if now >= timeout_at

      # HACK for edge case: avoid hogging CPU if FD for socket type doesn't block and just insists
      sleep JIFFY
    end
  else
    while true
      # p wait_readable: self
      @fd_io.wait_readable
      break if readable? # NOTE: ZMQ FD can't be trusted

      # HACK for edge case: avoid hogging CPU if FD for socket type doesn't block and just insists
      sleep JIFFY
    end
  end

  true
end

#wait_writable(timeout = write_timeout) ⇒ true

Waits for socket to become writable.

Parameters:

  • timeout (Numeric, nil) (defaults to: write_timeout)

    timeout in seconds

Returns:

  • (true)

    if writable within timeout

Raises:



88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
# File 'lib/cztop/send_receive_methods.rb', line 88

def wait_writable(timeout = write_timeout)
  return true if writable?

  @fd_io ||= to_io

  if timeout
    timeout_at = now + timeout

    while true
      # p wait_writable: self, timeout: timeout
      @fd_io.wait_writable(timeout)
      break if writable? # NOTE: ZMQ FD can't be trusted
      raise ::IO::TimeoutError if now >= timeout_at

      # HACK for edge case: avoid hogging CPU if FD for socket type doesn't block and just insists
      sleep JIFFY
    end
  else
    while true
      # p wait_writable: self
      @fd_io.wait_writable
      break if writable? # NOTE: ZMQ FD can't be trusted

      # HACK for edge case: avoid hogging CPU if FD for socket type doesn't block and just insists
      sleep JIFFY
    end
  end

  true
end

#write_timeoutFloat?

Returns the timeout in seconds used by IO#wait_writable.

Returns:

  • (Float, nil)

    the timeout in seconds used by IO#wait_writable



135
136
137
138
139
140
141
142
143
144
145
# File 'lib/cztop/send_receive_methods.rb', line 135

def write_timeout
  timeout = options.sndtimeo

  if timeout <= 0
    timeout = nil
  else
    timeout = timeout.to_f / 1000
  end

  timeout
end