Class: ResilientSocket::TCPClient
- Inherits:
-
Object
- Object
- ResilientSocket::TCPClient
- Includes:
- SemanticLogger::Loggable
- Defined in:
- lib/resilient_socket/tcp_client.rb
Overview
Make Socket calls resilient by adding timeouts, retries and specific exception categories
Resilient TCP Client with:
-
Connection Timeouts Ability to timeout if a connect does not complete within a reasonable time For example, this can occur when the server is turned off without shutting down causing clients to hang creating new connections
-
Automatic retries on startup connection failure For example, the server is being restarted while the client is starting Gives the server a few seconds to restart to
-
Automatic retries on active connection failures If the server is restarted during
Connection and Read Timeouts are fully configurable
Raises ConnectionTimeout when the connection timeout is exceeded Raises ReadTimeout when the read timeout is exceeded Raises ConnectionFailure when a network error occurs whilst reading or writing
Note: Only the following methods currently have auto-reconnect enabled:
* read
* write
Future:
-
Add auto-reconnect feature to sysread, syswrite, etc…
-
To be a drop-in replacement to TCPSocket should also need to implement the following TCPSocket instance methods: :addr, :peeraddr
Design Notes:
-
Does not inherit from Socket or TCP Socket because the socket instance has to be completely destroyed and recreated after a connection failure
Constant Summary collapse
- @@reconnect_on_errors =
[ Errno::ECONNABORTED, Errno::ECONNREFUSED, Errno::ECONNRESET, Errno::EHOSTUNREACH, Errno::EIO, Errno::ENETDOWN, Errno::ENETRESET, Errno::EPIPE, Errno::ETIMEDOUT, EOFError, ]
Instance Attribute Summary collapse
-
#buffered ⇒ Object
readonly
Returns [TrueClass|FalseClass] Whether send buffering is enabled for this connection.
-
#close_on_error ⇒ Object
Returns the value of attribute close_on_error.
-
#connect_retry_count ⇒ Object
Returns the value of attribute connect_retry_count.
-
#connect_retry_interval ⇒ Object
Returns the value of attribute connect_retry_interval.
-
#connect_timeout ⇒ Object
Returns the value of attribute connect_timeout.
-
#read_timeout ⇒ Object
Returns the value of attribute read_timeout.
-
#retry_count ⇒ Object
Returns the value of attribute retry_count.
-
#server ⇒ Object
readonly
Returns [String] Name of the server connected to including the port number.
-
#server_selector ⇒ Object
Returns the value of attribute server_selector.
-
#user_data ⇒ Object
Supports embedding user supplied data along with this connection such as sequence number and other connection specific information.
Class Method Summary collapse
-
.connect(params = {}) ⇒ Object
Create a connection, call the supplied block and close the connection on completion of the block.
-
.reconnect_on_errors ⇒ Object
Return the array of errors that will result in an automatic connection retry To add any additional errors to the standard list: ResilientSocket::TCPClient.reconnect_on_errors << Errno::EPROTO.
Instance Method Summary collapse
-
#alive? ⇒ Boolean
Returns whether the connection to the server is alive.
-
#close ⇒ Object
Close the socket only if it is not already closed.
-
#closed? ⇒ Boolean
Returns whether the socket is closed.
-
#connect ⇒ Object
Connect to the TCP server.
-
#initialize(parameters = {}) ⇒ TCPClient
constructor
Create a new TCP Client connection.
-
#read(length, buffer = nil, timeout = nil) ⇒ Object
Returns a response from the server.
-
#retry_on_connection_failure ⇒ Object
Send and/or receive data with automatic retry on connection failure.
-
#setsockopt(level, optname, optval) ⇒ Object
See: Socket#setsockopt.
-
#write(data) ⇒ Object
Send data to the server.
Constructor Details
#initialize(parameters = {}) ⇒ TCPClient
Create a new TCP Client connection
Parameters:
:server [String]
URL of the server to connect to with port number
'localhost:2000'
:servers [Array of String]
Array of URL's of servers to connect to with port numbers
['server1:2000', 'server2:2000']
The second server will only be attempted once the first server
cannot be connected to or has timed out on connect
A read failure or timeout will not result in switching to the second
server, only a connection failure or during an automatic reconnect
:read_timeout [Float]
Time in seconds to timeout on read
Can be overridden by supplying a timeout in the read call
Default: 60
:connect_timeout [Float]
Time in seconds to timeout when trying to connect to the server
A value of -1 will cause the connect wait time to be infinite
Default: Half of the :read_timeout ( 30 seconds )
:log_level [Symbol]
Set the logging level for the TCPClient
Any valid SemanticLogger log level:
:trace, :debug, :info, :warn, :error, :fatal
Default: SemanticLogger.default_level
:buffered [Boolean]
Whether to use Nagle's Buffering algorithm (http://en.wikipedia.org/wiki/Nagle's_algorithm)
Recommend disabling for RPC style invocations where we don't want to wait for an
ACK from the server before sending the last partial segment
Buffering is recommended in a browser or file transfer style environment
where multiple sends are expected during a single response
Default: true
:connect_retry_count [Fixnum]
Number of times to retry connecting when a connection fails
Default: 10
:connect_retry_interval [Float]
Number of seconds between connection retry attempts after the first failed attempt
Default: 0.5
:retry_count [Fixnum]
Number of times to retry when calling #retry_on_connection_failure
This is independent of :connect_retry_count which still applies with
connection failures. This retry controls upto how many times to retry the
supplied block should a connection failure occurr during the block
Default: 3
:on_connect [Proc]
Directly after a connection is established and before it is made available
for use this Block is invoked.
Typical Use Cases:
- Initialize per connection session sequence numbers
- Pass any authentication information to the server
- Perform a handshake with the server
:server_selector [Symbol|Proc]
When multiple servers are supplied using :servers, this option will
determine which server is selected from the list
:ordered
Select a server in the order supplied in the array, with the first
having the highest priority. The second server will only be connected
to if the first server is unreachable
:random
Randomly select a server from the list every time a connection
is established, including during automatic connection recovery.
:nearest
FUTURE - Not implemented yet
The server with an IP address that most closely matches the
local ip address will be attempted first
This will result in connections to servers on the localhost
first prior to looking at remote servers
:ping_time
FUTURE - Not implemented yet
The server with the lowest ping time will be selected first
Proc:
When a Proc is supplied, it will be called passing in the list
of servers. The Proc must return one server name
Example:
:server_selector => Proc.new do |servers|
servers.last
end
Default: :ordered
:close_on_error [True|False]
To prevent the connection from going into an inconsistent state
automatically close the connection if an error occurs
This includes a Read Timeout
Default: true
Example
client = ResilientSocket::TCPClient.new(
:server => 'server:3300',
:connect_retry_interval => 0.1,
:connect_retry_count => 5
)
client.retry_on_connection_failure do
client.send('Update the database')
end
# Read upto 20 characters from the server
response = client.read(20)
puts "Received: #{response}"
client.close
219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 |
# File 'lib/resilient_socket/tcp_client.rb', line 219 def initialize(parameters={}) params = parameters.dup @read_timeout = (params.delete(:read_timeout) || 60.0).to_f @connect_timeout = (params.delete(:connect_timeout) || (@read_timeout/2)).to_f buffered = params.delete(:buffered) @buffered = buffered.nil? ? true : buffered @connect_retry_count = params.delete(:connect_retry_count) || 10 @retry_count = params.delete(:retry_count) || 3 @connect_retry_interval = (params.delete(:connect_retry_interval) || 0.5).to_f @on_connect = params.delete(:on_connect) @server_selector = params.delete(:server_selector) || :ordered @close_on_error = params.delete(:close_on_error) @close_on_error = true if @close_on_error.nil? unless @servers = params.delete(:servers) raise "Missing mandatory :server or :servers" unless server = params.delete(:server) @servers = [ server ] end self.logger = SemanticLogger::Logger.new("#{self.class.name} #{@servers.inspect}", params.delete(:log_level)) params.each_pair {|k,v| logger.warn "Ignoring unknown option #{k} = #{v}"} # Connect to the Server connect end |
Instance Attribute Details
#buffered ⇒ Object (readonly)
Returns [TrueClass|FalseClass] Whether send buffering is enabled for this connection
57 58 59 |
# File 'lib/resilient_socket/tcp_client.rb', line 57 def buffered @buffered end |
#close_on_error ⇒ Object
Returns the value of attribute close_on_error.
53 54 55 |
# File 'lib/resilient_socket/tcp_client.rb', line 53 def close_on_error @close_on_error end |
#connect_retry_count ⇒ Object
Returns the value of attribute connect_retry_count.
53 54 55 |
# File 'lib/resilient_socket/tcp_client.rb', line 53 def connect_retry_count @connect_retry_count end |
#connect_retry_interval ⇒ Object
Returns the value of attribute connect_retry_interval.
53 54 55 |
# File 'lib/resilient_socket/tcp_client.rb', line 53 def connect_retry_interval @connect_retry_interval end |
#connect_timeout ⇒ Object
Returns the value of attribute connect_timeout.
53 54 55 |
# File 'lib/resilient_socket/tcp_client.rb', line 53 def connect_timeout @connect_timeout end |
#read_timeout ⇒ Object
Returns the value of attribute read_timeout.
53 54 55 |
# File 'lib/resilient_socket/tcp_client.rb', line 53 def read_timeout @read_timeout end |
#retry_count ⇒ Object
Returns the value of attribute retry_count.
53 54 55 |
# File 'lib/resilient_socket/tcp_client.rb', line 53 def retry_count @retry_count end |
#server ⇒ Object (readonly)
Returns [String] Name of the server connected to including the port number
Example:
localhost:2000
51 52 53 |
# File 'lib/resilient_socket/tcp_client.rb', line 51 def server @server end |
#server_selector ⇒ Object
Returns the value of attribute server_selector.
53 54 55 |
# File 'lib/resilient_socket/tcp_client.rb', line 53 def server_selector @server_selector end |
#user_data ⇒ Object
Supports embedding user supplied data along with this connection such as sequence number and other connection specific information
45 46 47 |
# File 'lib/resilient_socket/tcp_client.rb', line 45 def user_data @user_data end |
Class Method Details
.connect(params = {}) ⇒ Object
Create a connection, call the supplied block and close the connection on completion of the block
See #initialize for the list of parameters
Example
ResilientSocket::TCPClient.connect(
:server => 'server:3300',
:connect_retry_interval => 0.1,
:connect_retry_count => 5
) do |client|
client.retry_on_connection_failure do
client.send('Update the database')
end
response = client.read(20)
puts "Received: #{response}"
end
97 98 99 100 101 102 103 104 |
# File 'lib/resilient_socket/tcp_client.rb', line 97 def self.connect(params={}) begin connection = self.new(params) yield(connection) ensure connection.close if connection end end |
.reconnect_on_errors ⇒ Object
Return the array of errors that will result in an automatic connection retry
To add any additional errors to the standard list:
ResilientSocket::TCPClient.reconnect_on_errors << Errno::EPROTO
75 76 77 |
# File 'lib/resilient_socket/tcp_client.rb', line 75 def self.reconnect_on_errors @@reconnect_on_errors end |
Instance Method Details
#alive? ⇒ Boolean
Returns whether the connection to the server is alive
It is useful to call this method before making a call to the server that would change data on the server
Note: This method is only useful if the server closed the connection or
if a previous connection failure occurred.
If the server is hard killed this will still return true until one
or more writes are attempted
Note: In testing the overhead of this call is rather low, with the ability to make about 120,000 calls per second against an active connection. I.e. About 8.3 micro seconds per call
519 520 521 522 523 524 525 526 527 528 529 |
# File 'lib/resilient_socket/tcp_client.rb', line 519 def alive? return false if @socket.closed? if IO.select([@socket], nil, nil, 0) !@socket.eof? rescue false else true end rescue IOError false end |
#close ⇒ Object
Close the socket only if it is not already closed
Logs a warning if an error occurs trying to close the socket
495 496 497 498 499 |
# File 'lib/resilient_socket/tcp_client.rb', line 495 def close @socket.close unless @socket.closed? rescue IOError => exception logger.warn "IOError when attempting to close socket: #{exception.class}: #{exception.}" end |
#closed? ⇒ Boolean
Returns whether the socket is closed
502 503 504 |
# File 'lib/resilient_socket/tcp_client.rb', line 502 def closed? @socket.closed? end |
#connect ⇒ Object
Connect to the TCP server
Raises ConnectionTimeout when the time taken to create a connection
exceeds the :connect_timeout
Raises ConnectionFailure whenever Socket raises an error such as Error::EACCESS etc, see Socket#connect for more information
Error handling is implemented as follows:
-
TCP Socket Connect failure: Cannot reach server Server is being restarted, or is not running Retry 50 times every 100ms before raising a ConnectionFailure
-
Means all calls to #connect will take at least 5 seconds before failing if the server is not running
-
Allows hot restart of server process if it restarts within 5 seconds
-
-
TCP Socket Connect timeout: Timed out after 5 seconds trying to connect to the server Usually means server is busy or the remote server disappeared off the network recently No retry, just raise a ConnectionTimeout
Note: When multiple servers are supplied it will only try to connect to
the subsequent servers once the retry count has been exceeded
Note: Calling #connect on an open connection will close the current connection
and create a new connection
268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 |
# File 'lib/resilient_socket/tcp_client.rb', line 268 def connect @socket.close if @socket && !@socket.closed? if @servers.size > 1 case when @server_selector.is_a?(Proc) connect_to_server(@server_selector.call(@servers)) when @server_selector == :ordered # Try each server in sequence exception = nil @servers.find do |server| begin connect_to_server(server) exception = nil true rescue ConnectionFailure => exc exception = exc false end end # Raise Exception once it has also failed to connect to all servers raise(exception) if exception when @server_selector == :random # Pick each server randomly, trying each server until one can be connected to # If no server can be connected to a ConnectionFailure is raised servers_to_try = @servers.uniq exception = nil servers_to_try.size.times do |i| server = servers_to_try[rand(servers_to_try.size)] servers_to_try.delete(server) begin connect_to_server(server) exception = nil rescue ConnectionFailure => exc exception = exc end end # Raise Exception once it has also failed to connect to all servers raise(exception) if exception else raise ArgumentError.new("Invalid or unknown value for parameter :server_selector => #{@server_selector}") end else connect_to_server(@servers.first) end # Invoke user supplied Block every time a new connection has been established @on_connect.call(self) if @on_connect true end |
#read(length, buffer = nil, timeout = nil) ⇒ Object
Returns a response from the server
Raises ConnectionTimeout when the time taken to create a connection
exceeds the :connect_timeout
Connection is closed
Raises ConnectionFailure whenever Socket raises an error such as
Error::EACCESS etc, see Socket#connect for more information
Connection is closed
Raises ReadTimeout if the timeout has been exceeded waiting for the
requested number of bytes from the server
Partial data will not be returned
Connection is _not_ closed and #read can be called again later
to read the respnse from the connection
Parameters
length [Fixnum]
The number of bytes to return
#read will not return unitl 'length' bytes have been received from
the server
timeout [Float]
Optional: Override the default read timeout for this read
Number of seconds before raising ReadTimeout when no data has
been returned
A value of -1 will wait forever for a response on the socket
Default: :read_timeout supplied to #initialize
Note: After a ResilientSocket::ReadTimeout #read can be called again on
the same socket to read the response later.
If the application no longers want the connection after a
ResilientSocket::ReadTimeout, then the #close method _must_ be called
before calling _connect_ or _retry_on_connection_failure_ to create
a new connection
380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 |
# File 'lib/resilient_socket/tcp_client.rb', line 380 def read(length, buffer=nil, timeout=nil) result = nil logger.benchmark_debug("#read <== read #{length} bytes") do if timeout != -1 # Block on data to read for @read_timeout seconds ready = begin ready = IO.select([@socket], nil, [@socket], timeout || @read_timeout) rescue IOError => exception logger.warn "#read Connection failure while waiting for data: #{exception.class}: #{exception.}" close if close_on_error raise ConnectionFailure.new("#{exception.class}: #{exception.}", @server, exception) rescue Exception # Close the connection on any other exception since the connection # will now be in an inconsistent state close if close_on_error raise end unless ready close if close_on_error logger.warn "#read Timeout waiting for server to reply" raise ReadTimeout.new("Timedout after #{timeout || @read_timeout} seconds trying to read from #{@server}") end end # Read data from socket begin result = buffer.nil? ? @socket.read(length) : @socket.read(length, buffer) logger.trace("#read <== received", result.inspect) # EOF before all the data was returned if result.nil? || (result.length < length) close if close_on_error logger.warn "#read server closed the connection before #{length} bytes were returned" raise ConnectionFailure.new("Connection lost while reading data", @server, EOFError.new("end of file reached")) end rescue SystemCallError, IOError => exception close if close_on_error logger.warn "#read Connection failure while reading data: #{exception.class}: #{exception.}" raise ConnectionFailure.new("#{exception.class}: #{exception.}", @server, exception) rescue Exception # Close the connection on any other exception since the connection # will now be in an inconsistent state close if close_on_error raise end end result end |
#retry_on_connection_failure ⇒ Object
Send and/or receive data with automatic retry on connection failure
On a connection failure, it will create a new connection and retry the block. Returns immediately on exception ReadTimeout The connection is always closed on ConnectionFailure regardless of close_on_error
-
Example of a resilient readonly request:
When reading data from a server that does not change state on the server Wrap both the send and the read with #retry_on_connection_failure since it is safe to send the same data twice to the server
# Since the send can be sent many times it is safe to also put the receive # inside the retry block value = client.retry_on_connection_failure do
client.send("GETVALUE:count\n") client.read(20).strip.to_i
end
-
Example of a resilient request that modifies data on the server:
When changing state on the server, for example when updating a value Wrap only the send with #retry_on_connection_failure The read must be outside the #retry_on_connection_failure since we must not retry the send if the connection fails during the #read
value = 45 # Only the send is within the retry block since we cannot re-send once # the send was successful since the server may have made the change client.retry_on_connection_failure do
client.send("SETVALUE:#{count}\n")
end # Server returns “SAVED” if the call was successfull result = client.read(20).strip
Error handling is implemented as follows:
If a network failure occurrs during the block invocation the block
will be called again with a new connection to the server.
It will only be retried up to 3 times
The re-connect will independently retry and timeout using all the
rules of #connect
470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 |
# File 'lib/resilient_socket/tcp_client.rb', line 470 def retry_on_connection_failure retries = 0 begin connect if closed? yield(self) rescue ConnectionFailure => exception exc_str = exception.cause ? "#{exception.cause.class}: #{exception.cause.}" : exception. # Re-raise exceptions that should not be retried if !self.class.reconnect_on_errors.include?(exception.cause.class) logger.warn "#retry_on_connection_failure not configured to retry: #{exc_str}" raise exception elsif retries < @retry_count retries += 1 logger.warn "#retry_on_connection_failure retry #{retries} due to #{exception.class}: #{exception.}" connect retry end logger.error "#retry_on_connection_failure Connection failure: #{exception.class}: #{exception.}. Giving up after #{retries} retries" raise ConnectionFailure.new("After #{retries} retries to host '#{server}': #{exc_str}", @server, exception.cause) end end |
#setsockopt(level, optname, optval) ⇒ Object
See: Socket#setsockopt
532 533 534 |
# File 'lib/resilient_socket/tcp_client.rb', line 532 def setsockopt(level, optname, optval) @socket.setsockopt(level, optname, optval) end |
#write(data) ⇒ Object
Send data to the server
Use #with_retry to add resilience to the #send method
Raises ConnectionFailure whenever the send fails
For a description of the errors, see Socket#write
328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 |
# File 'lib/resilient_socket/tcp_client.rb', line 328 def write(data) logger.trace("#write ==> sending", data) logger.benchmark_debug("#write ==> sent #{data.length} bytes") do begin @socket.write(data) rescue SystemCallError => exception logger.warn "#write Connection failure: #{exception.class}: #{exception.}" close if close_on_error raise ConnectionFailure.new("Send Connection failure: #{exception.class}: #{exception.}", @server, exception) rescue Exception # Close the connection on any other exception since the connection # will now be in an inconsistent state close if close_on_error raise end end end |