Class: Kafka::SSLSocketWithTimeout
- Inherits:
-
Object
- Object
- Kafka::SSLSocketWithTimeout
- Defined in:
- lib/kafka/ssl_socket_with_timeout.rb
Overview
Opens sockets in a non-blocking fashion, ensuring that we’re not stalling for long periods of time.
It’s possible to set timeouts for connecting to the server, for reading data, and for writing data. Whenever a timeout is exceeded, Errno::ETIMEDOUT is raised.
Instance Method Summary collapse
- #close ⇒ Object
- #closed? ⇒ Boolean
-
#initialize(host, port, connect_timeout: nil, timeout: nil, ssl_context:) ⇒ SSLSocketWithTimeout
constructor
Opens a socket.
-
#read(num_bytes) ⇒ String
Reads bytes from the socket, possible with a timeout.
- #select_with_timeout(socket, type) ⇒ Object
- #set_encoding(encoding) ⇒ Object
-
#write(bytes) ⇒ Integer
Writes bytes to the socket, possible with a timeout.
Constructor Details
#initialize(host, port, connect_timeout: nil, timeout: nil, ssl_context:) ⇒ SSLSocketWithTimeout
Opens a socket.
24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 |
# File 'lib/kafka/ssl_socket_with_timeout.rb', line 24 def initialize(host, port, connect_timeout: nil, timeout: nil, ssl_context:) addr = Socket.getaddrinfo(host, nil) sockaddr = Socket.pack_sockaddr_in(port, addr[0][3]) @connect_timeout = connect_timeout @timeout = timeout @tcp_socket = Socket.new(Socket.const_get(addr[0][0]), Socket::SOCK_STREAM, 0) @tcp_socket.setsockopt(Socket::IPPROTO_TCP, Socket::TCP_NODELAY, 1) # first initiate the TCP socket begin # Initiate the socket connection in the background. If it doesn't fail # immediately it will raise an IO::WaitWritable (Errno::EINPROGRESS) # indicating the connection is in progress. @tcp_socket.connect_nonblock(sockaddr) rescue IO::WaitWritable # select will block until the socket is writable or the timeout # is exceeded, whichever comes first. unless select_with_timeout(@tcp_socket, :connect_write) # select returns nil when the socket is not ready before timeout # seconds have elapsed @tcp_socket.close raise Errno::ETIMEDOUT end begin # Verify there is now a good connection. @tcp_socket.connect_nonblock(sockaddr) rescue Errno::EISCONN # The socket is connected, we're good! end end # once that's connected, we can start initiating the ssl socket @ssl_socket = OpenSSL::SSL::SSLSocket.new(@tcp_socket, ssl_context) begin # Initiate the socket connection in the background. If it doesn't fail # immediately it will raise an IO::WaitWritable (Errno::EINPROGRESS) # indicating the connection is in progress. # Unlike waiting for a tcp socket to connect, you can't time out ssl socket # connections during the connect phase properly, because IO.select only partially works. # Instead, you have to retry. @ssl_socket.connect_nonblock rescue Errno::EAGAIN, Errno::EWOULDBLOCK, IO::WaitReadable if select_with_timeout(@ssl_socket, :connect_read) retry else @ssl_socket.close close raise Errno::ETIMEDOUT end rescue IO::WaitWritable if select_with_timeout(@ssl_socket, :connect_write) retry else close raise Errno::ETIMEDOUT end end end |
Instance Method Details
#close ⇒ Object
161 162 163 164 |
# File 'lib/kafka/ssl_socket_with_timeout.rb', line 161 def close @tcp_socket.close @ssl_socket.close end |
#closed? ⇒ Boolean
166 167 168 |
# File 'lib/kafka/ssl_socket_with_timeout.rb', line 166 def closed? @tcp_socket.closed? || @ssl_socket.closed? end |
#read(num_bytes) ⇒ String
Reads bytes from the socket, possible with a timeout.
92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 |
# File 'lib/kafka/ssl_socket_with_timeout.rb', line 92 def read(num_bytes) buffer = String.new until buffer.length >= num_bytes begin # Unlike plain TCP sockets, SSL sockets don't support IO.select # properly. # Instead, timeouts happen on a per read basis, and we have to # catch exceptions from read_nonblock and gradually build up # our read buffer. buffer << @ssl_socket.read_nonblock(num_bytes - buffer.length) rescue IO::WaitReadable if select_with_timeout(@ssl_socket, :read) retry else raise Errno::ETIMEDOUT end rescue IO::WaitWritable if select_with_timeout(@ssl_socket, :write) retry else raise Errno::ETIMEDOUT end end end buffer end |
#select_with_timeout(socket, type) ⇒ Object
174 175 176 177 178 179 180 181 182 183 184 185 |
# File 'lib/kafka/ssl_socket_with_timeout.rb', line 174 def select_with_timeout(socket, type) case type when :connect_read IO.select([socket], nil, nil, @connect_timeout) when :connect_write IO.select(nil, [socket], nil, @connect_timeout) when :read IO.select([socket], nil, nil, @timeout) when :write IO.select(nil, [socket], nil, @timeout) end end |
#set_encoding(encoding) ⇒ Object
170 171 172 |
# File 'lib/kafka/ssl_socket_with_timeout.rb', line 170 def set_encoding(encoding) @tcp_socket.set_encoding(encoding) end |
#write(bytes) ⇒ Integer
Writes bytes to the socket, possible with a timeout.
126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 |
# File 'lib/kafka/ssl_socket_with_timeout.rb', line 126 def write(bytes) loop do written = 0 begin # unlike plain tcp sockets, ssl sockets don't support IO.select # properly. # Instead, timeouts happen on a per write basis, and we have to # catch exceptions from write_nonblock, and gradually build up # our write buffer. written += @ssl_socket.write_nonblock(bytes) rescue Errno::EFAULT => error raise error rescue OpenSSL::SSL::SSLError, Errno::EAGAIN, Errno::EWOULDBLOCK, IO::WaitWritable => error if error.is_a?(OpenSSL::SSL::SSLError) && error. == 'write would block' if select_with_timeout(@ssl_socket, :write) retry else raise Errno::ETIMEDOUT end else raise error end end # Fast, common case. break if written == bytes.size # This takes advantage of the fact that most ruby implementations # have Copy-On-Write strings. Thusly why requesting a subrange # of data, we actually don't copy data because the new string # simply references a subrange of the original. bytes = bytes[written, bytes.size] end end |