Method: Kafka::SocketWithTimeout#initialize

Defined in:
lib/kafka/socket_with_timeout.rb

#initialize(host, port, connect_timeout: nil, timeout: nil) ⇒ SocketWithTimeout

Opens a socket.

Parameters:

  • host (String)
  • port (Integer)
  • connect_timeout (Integer) (defaults to: nil)

    the connection timeout, in seconds.

  • timeout (Integer) (defaults to: nil)

    the read and write timeout, in seconds.

Raises:

  • (Errno::ETIMEDOUT)

    if the timeout is exceeded.



23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
# File 'lib/kafka/socket_with_timeout.rb', line 23

def initialize(host, port, connect_timeout: nil, timeout: nil)
  addr = Socket.getaddrinfo(host, nil)
  sockaddr = Socket.pack_sockaddr_in(port, addr[0][3])

  @timeout = timeout

  @socket = Socket.new(Socket.const_get(addr[0][0]), Socket::SOCK_STREAM, 0)
  @socket.setsockopt(Socket::IPPROTO_TCP, Socket::TCP_NODELAY, 1)

  begin
    # Initiate the socket connection in the background. If it doesn't fail
    # immediately it will raise an IO::WaitWritable (Errno::EINPROGRESS)
    # indicating the connection is in progress.
    @socket.connect_nonblock(sockaddr)
  rescue IO::WaitWritable
    # IO.select will block until the socket is writable or the timeout
    # is exceeded, whichever comes first.
    unless IO.select(nil, [@socket], nil, connect_timeout)
      # IO.select returns nil when the socket is not ready before timeout
      # seconds have elapsed
      @socket.close
      raise Errno::ETIMEDOUT
    end

    begin
      # Verify there is now a good connection.
      @socket.connect_nonblock(sockaddr)
    rescue Errno::EISCONN
      # The socket is connected, we're good!
    end
  end
end