Class: FTW::Connection
- Inherits:
-
Object
- Object
- FTW::Connection
- Includes:
- Cabin::Inspectable, Poolable
- Defined in:
- lib/ftw/connection.rb
Overview
A network connection. This is TCP.
You can use IO::select on this objects of this type. (at least, in MRI you can)
You can activate SSL/TLS on this connection by invoking FTW::Connection#secure
This class also implements buffering itself because some IO-like classes (OpenSSL::SSL::SSLSocket) do not support IO#ungetbyte
Defined Under Namespace
Classes: ConnectRefused, ConnectTimeout, ReadTimeout, SecureHandshakeTimeout, WriteTimeout
Instance Method Summary collapse
-
#client? ⇒ Boolean
Is this a client connection?.
-
#connect(timeout = nil) ⇒ nil, StandardError or subclass
Connect now.
-
#connected? ⇒ Boolean
Is this Connection connected?.
-
#disconnect(reason) ⇒ Object
End this connection, specifying why.
-
#peer ⇒ Object
The host:port.
-
#pushback(data) ⇒ Object
Push back some data onto the connection’s read buffer.
-
#read(length = 16384, timeout = nil) ⇒ Object
Read data from this connection This method blocks until the read succeeds unless a timeout is given.
-
#readable?(timeout) ⇒ Boolean
Is this connection readable? Returns true if it is readable within the timeout period.
-
#secure(options = nil) ⇒ Object
Secure this connection with TLS.
-
#secured? ⇒ Boolean
Has this connection been secured?.
-
#server? ⇒ Boolean
Is this a server connection?.
-
#to_io ⇒ Object
Support ‘to_io’ so you can use IO::select on this object.
-
#writable?(timeout) ⇒ Boolean
Is this connection writable? Returns true if it is writable within the timeout period.
-
#write(data, timeout = nil) ⇒ Object
Write data to this connection.
Methods included from Poolable
Instance Method Details
#client? ⇒ Boolean
Is this a client connection?
406 407 408 |
# File 'lib/ftw/connection.rb', line 406 def client? return @mode == :client end |
#connect(timeout = nil) ⇒ nil, StandardError or subclass
Connect now.
Timeout value is optional. If no timeout is given, this method blocks until a connection is successful or an error occurs.
You should check the return value of this method to determine if a connection was successful.
Possible return values are on error include:
-
FTW::Connection::ConnectRefused
-
FTW::Connection::ConnectTimeout
129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 |
# File 'lib/ftw/connection.rb', line 129 def connect(timeout=nil) # TODO(sissel): Raise if we're already connected? disconnect("reconnecting") if connected? host, port = @destinations.first.split(":") @destinations = @destinations.rotate # round-robin # Do dns resolution on the host. If there are multiple # addresses resolved, return one at random. addresses = FTW::DNS.singleton.resolve(host) addresses.each do |address| # Try each address until one works. @remote_address = address # Addresses with colon ':' in them are assumed to be IPv6 family = @remote_address.include?(":") ? Socket::AF_INET6 : Socket::AF_INET @logger.debug("Connecting", :address => @remote_address, :host => host, :port => port, :family => family) @socket = Socket.new(family, Socket::SOCK_STREAM, 0) @socket.setsockopt(Socket::IPPROTO_TCP, Socket::TCP_NODELAY, 1) # This api is terrible. pack_sockaddr_in? This isn't C, man... @logger.debug("packing", :data => [port.to_i, @remote_address]) sockaddr = Socket.pack_sockaddr_in(port.to_i, @remote_address) # TODO(sissel): Support local address binding # Connect with timeout begin @socket.connect_nonblock(sockaddr) rescue IO::WaitWritable, Errno::EINPROGRESS # Ruby actually raises Errno::EINPROGRESS, but for some reason # the documentation says to use this IO::WaitWritable thing... # I don't get it, but whatever :( writable = writable?(timeout) # http://jira.codehaus.org/browse/JRUBY-6528; IO.select doesn't behave # correctly on JRuby < 1.7, so work around it. if writable || (RUBY_PLATFORM == "java" and JRUBY_VERSION < "1.7.0") begin @socket.connect_nonblock(sockaddr) # check connection failure rescue Errno::EISCONN # Ignore, we're already connected. rescue Errno::ECONNREFUSED => e # Fire 'disconnected' event with reason :refused @socket.close return ConnectRefused.new("#{host}[#{@remote_address}]:#{port}") rescue Errno::ETIMEDOUT # This occurs when the system's TCP timeout hits, we have no # control over this, as far as I can tell. *maybe* setsockopt(2) # has a flag for this, but I haven't checked.. # TODO(sissel): We should instead do 'retry' unless we've exceeded # the timeout. @socket.close return ConnectTimeout.new("#{host}[#{@remote_address}]:#{port}") rescue Errno::EINPROGRESS # If we get here, it's likely JRuby version < 1.7.0. EINPROGRESS at # this point in the code means that we have timed out. @socket.close return ConnectTimeout.new("#{host}[#{@remote_address}]:#{port}") end else # Connection timeout; return ConnectTimeout.new("#{host}[#{@remote_address}]:#{port}") end # If no error at this point, we're now connected. @connected = true break end # addresses.each end return nil end |
#connected? ⇒ Boolean
Is this Connection connected?
203 204 205 |
# File 'lib/ftw/connection.rb', line 203 def connected? return @connected end |
#disconnect(reason) ⇒ Object
End this connection, specifying why.
264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 |
# File 'lib/ftw/connection.rb', line 264 def disconnect(reason) if @socket.is_a?(OpenSSL::SSL::SSLSocket) @socket.sysclose() else begin @socket.close_read rescue IOError => e # Ignore, perhaps we shouldn't ignore. end begin @socket.close_write rescue IOError => e # Ignore, perhaps we shouldn't ignore. end end end |
#peer ⇒ Object
The host:port
301 302 303 |
# File 'lib/ftw/connection.rb', line 301 def peer return @remote_address end |
#pushback(data) ⇒ Object
Push back some data onto the connection’s read buffer.
259 260 261 |
# File 'lib/ftw/connection.rb', line 259 def pushback(data) @pushback_buffer << data end |
#read(length = 16384, timeout = nil) ⇒ Object
Read data from this connection This method blocks until the read succeeds unless a timeout is given.
This method is not guaranteed to read exactly ‘length’ bytes. See IO#sysread
227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 |
# File 'lib/ftw/connection.rb', line 227 def read(length=16384, timeout=nil) data = "" data.force_encoding("BINARY") if data.respond_to?(:force_encoding) have_pushback = !@pushback_buffer.empty? if have_pushback data << @pushback_buffer @pushback_buffer = "" # We have data 'now' so don't wait. timeout = 0 end if readable?(timeout) begin # Read at most 'length' data, so read less from the socket # We'll read less than 'length' if the pushback buffer has # data in it already. @socket.sysread(length - data.length, @read_buffer) data << @read_buffer return data rescue EOFError => e raise e end else if have_pushback return data else raise ReadTimeout.new end end end |
#readable?(timeout) ⇒ Boolean
Is this connection readable? Returns true if it is readable within the timeout period. False otherwise.
The time out is in seconds. Fractional seconds are OK.
295 296 297 298 |
# File 'lib/ftw/connection.rb', line 295 def readable?(timeout) readable, writable, errors = IO.select([@socket], nil, nil, timeout) return !readable.nil? end |
#secure(options = nil) ⇒ Object
Secure this connection with TLS.
Options:
-
:certificate_store, an OpenSSL::X509::Store
-
:timeout, a timeout threshold in seconds.
316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 |
# File 'lib/ftw/connection.rb', line 316 def secure(=nil) # Skip this if we're already secure. return if secured? defaults = { :timeout => nil, #:certificate_store => OpenSSL::SSL::SSLContext::DEFAULT_CERT_STORE } settings = defaults.merge() unless .nil? @logger.info("Securing this connection", :peer => peer, :options => settings) # Wrap this connection with TLS/SSL sslcontext = OpenSSL::SSL::SSLContext.new # If you use VERIFY_NONE, you are removing the trust feature of TLS. Don't do that. # Encryption without trust means you don't know who you are talking to. sslcontext.verify_mode = OpenSSL::SSL::VERIFY_PEER sslcontext.verify_callback = proc do |*args| @logger.debug("Verify peer via FTW::Connection#secure", :callback => settings[:verify_callback]) if settings[:verify_callback].respond_to?(:call) settings[:verify_callback].call(*args) end end sslcontext.ssl_version = :TLSv1 sslcontext.cert_store = [:certificate_store] @socket = OpenSSL::SSL::SSLSocket.new(@socket, sslcontext) # TODO(sissel): Set up local certificat/key stuff. This is required for # server-side ssl operation, I think. if client? do_secure(:connect_nonblock, [:timeout]) else do_secure(:accept_nonblock, [:timeout]) end end |
#secured? ⇒ Boolean
Has this connection been secured?
401 402 403 |
# File 'lib/ftw/connection.rb', line 401 def secured? return @secure end |
#server? ⇒ Boolean
Is this a server connection?
411 412 413 |
# File 'lib/ftw/connection.rb', line 411 def server? return @mode == :server end |
#to_io ⇒ Object
Support ‘to_io’ so you can use IO::select on this object.
306 307 308 |
# File 'lib/ftw/connection.rb', line 306 def to_io return @socket end |
#writable?(timeout) ⇒ Boolean
Is this connection writable? Returns true if it is writable within the timeout period. False otherwise.
The time out is in seconds. Fractional seconds are OK.
286 287 288 289 |
# File 'lib/ftw/connection.rb', line 286 def writable?(timeout) readable, writable, errors = IO.select(nil, [@socket], nil, timeout) return !writable.nil? end |
#write(data, timeout = nil) ⇒ Object
Write data to this connection. This method blocks until the write succeeds unless a timeout is given.
This method is not guaranteed to have written the full data given.
Returns the number of bytes written (See also IO#syswrite)
213 214 215 216 217 218 219 220 |
# File 'lib/ftw/connection.rb', line 213 def write(data, timeout=nil) #connect if !connected? if writable?(timeout) return @socket.syswrite(data) else raise FTW::Connection::WriteTimeout.new(self.inspect) end end |