Class: Kafka::SSLSocketWithTimeout

Inherits:
Object
  • Object
show all
Defined in:
lib/kafka/ssl_socket_with_timeout.rb

Overview

Opens sockets in a non-blocking fashion, ensuring that we’re not stalling for long periods of time.

It’s possible to set timeouts for connecting to the server, for reading data, and for writing data. Whenever a timeout is exceeded, Errno::ETIMEDOUT is raised.

Instance Method Summary collapse

Constructor Details

#initialize(host, port, connect_timeout: nil, timeout: nil, ssl_context:) ⇒ SSLSocketWithTimeout

Opens a socket.

Parameters:

  • host (String)
  • port (Integer)
  • connect_timeout (Integer) (defaults to: nil)

    the connection timeout, in seconds.

  • timeout (Integer) (defaults to: nil)

    the read and write timeout, in seconds.

  • ssl_context (OpenSSL::SSL::SSLContext)

    which SSLContext the ssl connection should use

Raises:

  • (Errno::ETIMEDOUT)

    if the timeout is exceeded.



24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
# File 'lib/kafka/ssl_socket_with_timeout.rb', line 24

def initialize(host, port, connect_timeout: nil, timeout: nil, ssl_context:)
  addr = Socket.getaddrinfo(host, nil)
  sockaddr = Socket.pack_sockaddr_in(port, addr[0][3])

  @connect_timeout = connect_timeout
  @timeout = timeout

  @tcp_socket = Socket.new(Socket.const_get(addr[0][0]), Socket::SOCK_STREAM, 0)
  @tcp_socket.setsockopt(Socket::IPPROTO_TCP, Socket::TCP_NODELAY, 1)

  # first initiate the TCP socket
  begin
    # Initiate the socket connection in the background. If it doesn't fail
    # immediately it will raise an IO::WaitWritable (Errno::EINPROGRESS)
    # indicating the connection is in progress.
    @tcp_socket.connect_nonblock(sockaddr)
  rescue IO::WaitWritable
    # select will block until the socket is writable or the timeout
    # is exceeded, whichever comes first.
    unless select_with_timeout(@tcp_socket, :connect_write)
      # select returns nil when the socket is not ready before timeout
      # seconds have elapsed
      @tcp_socket.close
      raise Errno::ETIMEDOUT
    end

    begin
      # Verify there is now a good connection.
      @tcp_socket.connect_nonblock(sockaddr)
    rescue Errno::EISCONN
      # The socket is connected, we're good!
    end
  end

  # once that's connected, we can start initiating the ssl socket
  @ssl_socket = OpenSSL::SSL::SSLSocket.new(@tcp_socket, ssl_context)

  begin
    # Initiate the socket connection in the background. If it doesn't fail
    # immediately it will raise an IO::WaitWritable (Errno::EINPROGRESS)
    # indicating the connection is in progress.
    # Unlike waiting for a tcp socket to connect, you can't time out ssl socket
    # connections during the connect phase properly, because IO.select only partially works.
    # Instead, you have to retry.
    @ssl_socket.connect_nonblock
  rescue Errno::EAGAIN, Errno::EWOULDBLOCK, IO::WaitReadable
    if select_with_timeout(@ssl_socket, :connect_read)
      retry
    else
      @ssl_socket.close
      close
      raise Errno::ETIMEDOUT
    end
  rescue IO::WaitWritable
    if select_with_timeout(@ssl_socket, :connect_write)
      retry
    else
      close
      raise Errno::ETIMEDOUT
    end
  end
end

Instance Method Details

#closeObject



161
162
163
164
# File 'lib/kafka/ssl_socket_with_timeout.rb', line 161

def close
  @tcp_socket.close
  @ssl_socket.close
end

#closed?Boolean

Returns:

  • (Boolean)


166
167
168
# File 'lib/kafka/ssl_socket_with_timeout.rb', line 166

def closed?
  @tcp_socket.closed? || @ssl_socket.closed?
end

#read(num_bytes) ⇒ String

Reads bytes from the socket, possible with a timeout.

Parameters:

  • num_bytes (Integer)

    the number of bytes to read.

Returns:

  • (String)

    the data that was read from the socket.

Raises:

  • (Errno::ETIMEDOUT)

    if the timeout is exceeded.



92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
# File 'lib/kafka/ssl_socket_with_timeout.rb', line 92

def read(num_bytes)
  buffer = String.new

  until buffer.length >= num_bytes
    begin
      # Unlike plain TCP sockets, SSL sockets don't support IO.select
      # properly.
      # Instead, timeouts happen on a per read basis, and we have to
      # catch exceptions from read_nonblock and gradually build up
      # our read buffer.
      buffer << @ssl_socket.read_nonblock(num_bytes - buffer.length)
    rescue IO::WaitReadable
      if select_with_timeout(@ssl_socket, :read)
        retry
      else
        raise Errno::ETIMEDOUT
      end
    rescue IO::WaitWritable
      if select_with_timeout(@ssl_socket, :write)
        retry
      else
        raise Errno::ETIMEDOUT
      end
    end
  end

  buffer
end

#select_with_timeout(socket, type) ⇒ Object



174
175
176
177
178
179
180
181
182
183
184
185
# File 'lib/kafka/ssl_socket_with_timeout.rb', line 174

def select_with_timeout(socket, type)
  case type
  when :connect_read
    IO.select([socket], nil, nil, @connect_timeout)
  when :connect_write
    IO.select(nil, [socket], nil, @connect_timeout)
  when :read
    IO.select([socket], nil, nil, @timeout)
  when :write
    IO.select(nil, [socket], nil, @timeout)
  end
end

#set_encoding(encoding) ⇒ Object



170
171
172
# File 'lib/kafka/ssl_socket_with_timeout.rb', line 170

def set_encoding(encoding)
  @tcp_socket.set_encoding(encoding)
end

#write(bytes) ⇒ Integer

Writes bytes to the socket, possible with a timeout.

Parameters:

  • bytes (String)

    the data that should be written to the socket.

Returns:

  • (Integer)

    the number of bytes written.

Raises:

  • (Errno::ETIMEDOUT)

    if the timeout is exceeded.



126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
# File 'lib/kafka/ssl_socket_with_timeout.rb', line 126

def write(bytes)
  loop do
    written = 0
    begin
      # unlike plain tcp sockets, ssl sockets don't support IO.select
      # properly.
      # Instead, timeouts happen on a per write basis, and we have to
      # catch exceptions from write_nonblock, and gradually build up
      # our write buffer.
      written += @ssl_socket.write_nonblock(bytes)
    rescue Errno::EFAULT => error
      raise error
    rescue OpenSSL::SSL::SSLError, Errno::EAGAIN, Errno::EWOULDBLOCK, IO::WaitWritable => error
      if error.is_a?(OpenSSL::SSL::SSLError) && error.message == 'write would block'
        if select_with_timeout(@ssl_socket, :write)
          retry
        else
          raise Errno::ETIMEDOUT
        end
      else
        raise error
      end
    end

    # Fast, common case.
    break if written == bytes.size

    # This takes advantage of the fact that most ruby implementations
    # have Copy-On-Write strings. Thusly why requesting a subrange
    # of data, we actually don't copy data because the new string
    # simply references a subrange of the original.
    bytes = bytes[written, bytes.size]
  end
end