Class: Kafka::SocketWithTimeout
- Inherits:
-
Object
- Object
- Kafka::SocketWithTimeout
- Defined in:
- lib/kafka/socket_with_timeout.rb
Overview
Opens sockets in a non-blocking fashion, ensuring that we’re not stalling for long periods of time.
It’s possible to set timeouts for connecting to the server, for reading data, and for writing data. Whenever a timeout is exceeded, Errno::ETIMEDOUT is raised.
Instance Method Summary collapse
- #close ⇒ Object
- #closed? ⇒ Boolean
-
#initialize(host, port, connect_timeout: nil, timeout: nil) ⇒ SocketWithTimeout
constructor
Opens a socket.
-
#read(num_bytes) ⇒ String
Reads bytes from the socket, possible with a timeout.
- #set_encoding(encoding) ⇒ Object
-
#write(bytes) ⇒ Integer
Writes bytes to the socket, possible with a timeout.
Constructor Details
#initialize(host, port, connect_timeout: nil, timeout: nil) ⇒ SocketWithTimeout
Opens a socket.
23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 |
# File 'lib/kafka/socket_with_timeout.rb', line 23 def initialize(host, port, connect_timeout: nil, timeout: nil) addr = Socket.getaddrinfo(host, nil) sockaddr = Socket.pack_sockaddr_in(port, addr[0][3]) @timeout = timeout @socket = Socket.new(Socket.const_get(addr[0][0]), Socket::SOCK_STREAM, 0) @socket.setsockopt(Socket::IPPROTO_TCP, Socket::TCP_NODELAY, 1) begin # Initiate the socket connection in the background. If it doesn't fail # immediately it will raise an IO::WaitWritable (Errno::EINPROGRESS) # indicating the connection is in progress. @socket.connect_nonblock(sockaddr) rescue IO::WaitWritable # IO.select will block until the socket is writable or the timeout # is exceeded, whichever comes first. unless IO.select(nil, [@socket], nil, connect_timeout) # IO.select returns nil when the socket is not ready before timeout # seconds have elapsed @socket.close raise Errno::ETIMEDOUT end begin # Verify there is now a good connection. @socket.connect_nonblock(sockaddr) rescue Errno::EISCONN # The socket is connected, we're good! end end end |
Instance Method Details
#close ⇒ Object
84 85 86 |
# File 'lib/kafka/socket_with_timeout.rb', line 84 def close @socket.close end |
#closed? ⇒ Boolean
88 89 90 |
# File 'lib/kafka/socket_with_timeout.rb', line 88 def closed? @socket.closed? end |
#read(num_bytes) ⇒ String
Reads bytes from the socket, possible with a timeout.
61 62 63 64 65 66 67 68 69 |
# File 'lib/kafka/socket_with_timeout.rb', line 61 def read(num_bytes) unless IO.select([@socket], nil, nil, @timeout) raise Errno::ETIMEDOUT end @socket.read(num_bytes) rescue IO::EAGAINWaitReadable retry end |
#set_encoding(encoding) ⇒ Object
92 93 94 |
# File 'lib/kafka/socket_with_timeout.rb', line 92 def set_encoding(encoding) @socket.set_encoding(encoding) end |
#write(bytes) ⇒ Integer
Writes bytes to the socket, possible with a timeout.
76 77 78 79 80 81 82 |
# File 'lib/kafka/socket_with_timeout.rb', line 76 def write(bytes) unless IO.select(nil, [@socket], nil, @timeout) raise Errno::ETIMEDOUT end @socket.write(bytes) end |