Class: Kafka::SSLSocketWithTimeout
- Inherits:
-
Object
- Object
- Kafka::SSLSocketWithTimeout
- Defined in:
- lib/kafka/ssl_socket_with_timeout.rb
Overview
Opens sockets in a non-blocking fashion, ensuring that we're not stalling for long periods of time.
It's possible to set timeouts for connecting to the server, for reading data, and for writing data. Whenever a timeout is exceeded, Errno::ETIMEDOUT is raised.
Instance Method Summary collapse
- #close ⇒ Object
- #closed? ⇒ Boolean
-
#initialize(host, port, connect_timeout: nil, timeout: nil, ssl_context:) ⇒ SSLSocketWithTimeout
constructor
Opens a socket.
-
#read(num_bytes) ⇒ String
Reads bytes from the socket, possible with a timeout.
- #select_with_timeout(socket, type) ⇒ Object
- #set_encoding(encoding) ⇒ Object
-
#write(bytes) ⇒ Integer
Writes bytes to the socket, possible with a timeout.
Constructor Details
#initialize(host, port, connect_timeout: nil, timeout: nil, ssl_context:) ⇒ SSLSocketWithTimeout
Opens a socket.
24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 |
# File 'lib/kafka/ssl_socket_with_timeout.rb', line 24 def initialize(host, port, connect_timeout: nil, timeout: nil, ssl_context:) addr = Socket.getaddrinfo(host, nil) sockaddr = Socket.pack_sockaddr_in(port, addr[0][3]) @connect_timeout = connect_timeout @timeout = timeout @tcp_socket = Socket.new(Socket.const_get(addr[0][0]), Socket::SOCK_STREAM, 0) @tcp_socket.setsockopt(Socket::IPPROTO_TCP, Socket::TCP_NODELAY, 1) # first initiate the TCP socket begin # Initiate the socket connection in the background. If it doesn't fail # immediately it will raise an IO::WaitWritable (Errno::EINPROGRESS) # indicating the connection is in progress. @tcp_socket.connect_nonblock(sockaddr) rescue IO::WaitWritable # select will block until the socket is writable or the timeout # is exceeded, whichever comes first. unless select_with_timeout(@tcp_socket, :connect_write) # select returns nil when the socket is not ready before timeout # seconds have elapsed @tcp_socket.close raise Errno::ETIMEDOUT end begin # Verify there is now a good connection. @tcp_socket.connect_nonblock(sockaddr) rescue Errno::EISCONN # The socket is connected, we're good! end end # once that's connected, we can start initiating the ssl socket @ssl_socket = OpenSSL::SSL::SSLSocket.new(@tcp_socket, ssl_context) @ssl_socket.hostname = host begin # Initiate the socket connection in the background. If it doesn't fail # immediately it will raise an IO::WaitWritable (Errno::EINPROGRESS) # indicating the connection is in progress. # Unlike waiting for a tcp socket to connect, you can't time out ssl socket # connections during the connect phase properly, because IO.select only partially works. # Instead, you have to retry. @ssl_socket.connect_nonblock rescue Errno::EAGAIN, Errno::EWOULDBLOCK, IO::WaitReadable if select_with_timeout(@ssl_socket, :connect_read) retry else @ssl_socket.close close raise Errno::ETIMEDOUT end rescue IO::WaitWritable if select_with_timeout(@ssl_socket, :connect_write) retry else close raise Errno::ETIMEDOUT end end end |
Instance Method Details
#close ⇒ Object
162 163 164 165 |
# File 'lib/kafka/ssl_socket_with_timeout.rb', line 162 def close @tcp_socket.close @ssl_socket.close end |
#closed? ⇒ Boolean
167 168 169 |
# File 'lib/kafka/ssl_socket_with_timeout.rb', line 167 def closed? @tcp_socket.closed? || @ssl_socket.closed? end |
#read(num_bytes) ⇒ String
Reads bytes from the socket, possible with a timeout.
93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 |
# File 'lib/kafka/ssl_socket_with_timeout.rb', line 93 def read(num_bytes) buffer = String.new until buffer.length >= num_bytes begin # Unlike plain TCP sockets, SSL sockets don't support IO.select # properly. # Instead, timeouts happen on a per read basis, and we have to # catch exceptions from read_nonblock and gradually build up # our read buffer. buffer << @ssl_socket.read_nonblock(num_bytes - buffer.length) rescue IO::WaitReadable if select_with_timeout(@ssl_socket, :read) retry else raise Errno::ETIMEDOUT end rescue IO::WaitWritable if select_with_timeout(@ssl_socket, :write) retry else raise Errno::ETIMEDOUT end end end buffer end |
#select_with_timeout(socket, type) ⇒ Object
175 176 177 178 179 180 181 182 183 184 185 186 |
# File 'lib/kafka/ssl_socket_with_timeout.rb', line 175 def select_with_timeout(socket, type) case type when :connect_read IO.select([socket], nil, nil, @connect_timeout) when :connect_write IO.select(nil, [socket], nil, @connect_timeout) when :read IO.select([socket], nil, nil, @timeout) when :write IO.select(nil, [socket], nil, @timeout) end end |
#set_encoding(encoding) ⇒ Object
171 172 173 |
# File 'lib/kafka/ssl_socket_with_timeout.rb', line 171 def set_encoding(encoding) @tcp_socket.set_encoding(encoding) end |
#write(bytes) ⇒ Integer
Writes bytes to the socket, possible with a timeout.
127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 |
# File 'lib/kafka/ssl_socket_with_timeout.rb', line 127 def write(bytes) loop do written = 0 begin # unlike plain tcp sockets, ssl sockets don't support IO.select # properly. # Instead, timeouts happen on a per write basis, and we have to # catch exceptions from write_nonblock, and gradually build up # our write buffer. written += @ssl_socket.write_nonblock(bytes) rescue Errno::EFAULT => error raise error rescue OpenSSL::SSL::SSLError, Errno::EAGAIN, Errno::EWOULDBLOCK, IO::WaitWritable => error if error.is_a?(OpenSSL::SSL::SSLError) && error. == 'write would block' if select_with_timeout(@ssl_socket, :write) retry else raise Errno::ETIMEDOUT end else raise error end end # Fast, common case. break if written == bytes.size # This takes advantage of the fact that most ruby implementations # have Copy-On-Write strings. Thusly why requesting a subrange # of data, we actually don't copy data because the new string # simply references a subrange of the original. bytes = bytes[written, bytes.size] end end |