Class: Cql::Io::Connection
- Inherits:
-
Object
- Object
- Cql::Io::Connection
- Defined in:
- lib/cql/io/connection.rb
Overview
A wrapper around a socket. Handles connecting to the remote host, reading from and writing to the socket.
Instance Attribute Summary collapse
-
#connection_timeout ⇒ Object
readonly
Returns the value of attribute connection_timeout.
-
#host ⇒ Object
readonly
Returns the value of attribute host.
-
#port ⇒ Object
readonly
Returns the value of attribute port.
Instance Method Summary collapse
-
#close(cause = nil) ⇒ Object
Closes the connection.
-
#closed? ⇒ Boolean
Returns true if the connection is closed.
- #connect ⇒ Object
-
#connected? ⇒ Boolean
Returns true if the connection is connected.
- #connecting? ⇒ Boolean
- #flush ⇒ Object
-
#initialize(host, port, connection_timeout, unblocker, clock, socket_impl = Socket) ⇒ Connection
constructor
A new instance of Connection.
-
#on_closed {|error, nil| ... } ⇒ Object
Register to receive a notification when the socket is closed, both for expected and unexpected reasons.
-
#on_data {|String| ... } ⇒ Object
Register to receive notifications when new data is read from the socket.
- #read ⇒ Object
- #to_io ⇒ Object
- #to_s ⇒ Object
- #writable? ⇒ Boolean
-
#write(bytes = nil) {|buffer| ... } ⇒ Object
Write bytes to the socket.
Constructor Details
#initialize(host, port, connection_timeout, unblocker, clock, socket_impl = Socket) ⇒ Connection
Returns a new instance of Connection.
16 17 18 19 20 21 22 23 24 25 26 27 |
# File 'lib/cql/io/connection.rb', line 16 def initialize(host, port, connection_timeout, unblocker, clock, socket_impl=Socket) @host = host @port = port @connection_timeout = connection_timeout @unblocker = unblocker @clock = clock @socket_impl = socket_impl @lock = Mutex.new @connected = false @write_buffer = ByteBuffer.new @connected_promise = Promise.new end |
Instance Attribute Details
#connection_timeout ⇒ Object (readonly)
Returns the value of attribute connection_timeout.
13 14 15 |
# File 'lib/cql/io/connection.rb', line 13 def connection_timeout @connection_timeout end |
#host ⇒ Object (readonly)
Returns the value of attribute host.
13 14 15 |
# File 'lib/cql/io/connection.rb', line 13 def host @host end |
#port ⇒ Object (readonly)
Returns the value of attribute port.
13 14 15 |
# File 'lib/cql/io/connection.rb', line 13 def port @port end |
Instance Method Details
#close(cause = nil) ⇒ Object
Closes the connection
69 70 71 72 73 74 75 76 77 78 |
# File 'lib/cql/io/connection.rb', line 69 def close(cause=nil) return false unless @io begin @io.close rescue SystemCallError, IOError # nothing to do, the socket was most likely already closed end closed!(cause) true end |
#closed? ⇒ Boolean
Returns true if the connection is closed
91 92 93 |
# File 'lib/cql/io/connection.rb', line 91 def closed? @io.nil? end |
#connect ⇒ Object
30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 |
# File 'lib/cql/io/connection.rb', line 30 def connect begin unless @addrinfos @connection_started_at = @clock.now @addrinfos = @socket_impl.getaddrinfo(@host, @port, nil, Socket::SOCK_STREAM) end unless @io _, port, _, ip, address_family, socket_type = @addrinfos.shift @sockaddr = @socket_impl.sockaddr_in(port, ip) @io = @socket_impl.new(address_family, socket_type, 0) end unless connected? @io.connect_nonblock(@sockaddr) @connected = true @connected_promise.fulfill(self) end rescue Errno::EISCONN @connected = true @connected_promise.fulfill(self) rescue Errno::EINPROGRESS, Errno::EALREADY if @clock.now - @connection_started_at > @connection_timeout close(ConnectionTimeoutError.new("Could not connect to #{@host}:#{@port} within #{@connection_timeout}s")) end rescue Errno::EINVAL => e if @addrinfos.empty? close(e) else @io = nil retry end rescue SystemCallError => e close(e) rescue SocketError => e close(e) || closed!(e) end @connected_promise.future end |
#connected? ⇒ Boolean
Returns true if the connection is connected
86 87 88 |
# File 'lib/cql/io/connection.rb', line 86 def connected? @connected end |
#connecting? ⇒ Boolean
81 82 83 |
# File 'lib/cql/io/connection.rb', line 81 def connecting? !(closed? || connected?) end |
#flush ⇒ Object
167 168 169 170 171 172 173 174 175 176 177 |
# File 'lib/cql/io/connection.rb', line 167 def flush if writable? @lock.synchronize do s = @write_buffer.cheap_peek.dup bytes_written = @io.write_nonblock(@write_buffer.cheap_peek) @write_buffer.discard(bytes_written) end end rescue => e close(e) end |
#on_closed {|error, nil| ... } ⇒ Object
Register to receive a notification when the socket is closed, both for expected and unexpected reasons.
You shoud only call this method in your protocol handler constructor.
Only one callback can be registered, if you register multiple times only the last one will receive notifications. This is not meant as a general event system, it’s just for protocol handlers to be notified of the connection closing. If you want multiple listeners you need to implement that yourself in your protocol handler.
Errors raised by the callback will be ignored.
142 143 144 |
# File 'lib/cql/io/connection.rb', line 142 def on_closed(&listener) @closed_listener = listener end |
#on_data {|String| ... } ⇒ Object
Register to receive notifications when new data is read from the socket.
You should only call this method in your protocol handler constructor.
Only one callback can be registered, if you register multiple times only the last one will receive notifications. This is not meant as a general event system, it’s just for protocol handlers to receive data from their connection. If you want multiple listeners you need to implement that yourself in your protocol handler.
It is very important that you don’t do any heavy lifting in the callback since it is called from the IO reactor thread, and as long as the callback is working the reactor can’t handle any IO and no other callbacks can be called.
Errors raised by the callback will be ignored.
122 123 124 |
# File 'lib/cql/io/connection.rb', line 122 def on_data(&listener) @data_listener = listener end |
#read ⇒ Object
180 181 182 183 184 185 |
# File 'lib/cql/io/connection.rb', line 180 def read new_data = @io.read_nonblock(2**16) @data_listener.call(new_data) if @data_listener rescue => e close(e) end |
#to_io ⇒ Object
188 189 190 |
# File 'lib/cql/io/connection.rb', line 188 def to_io @io end |
#to_s ⇒ Object
192 193 194 195 196 197 198 199 200 201 202 |
# File 'lib/cql/io/connection.rb', line 192 def to_s state = 'inconsistent' if connected? state = 'connected' elsif connecting? state = 'connecting' elsif closed? state = 'closed' end %(#<#{self.class.name} #{state} #{@host}:#{@port}>) end |
#writable? ⇒ Boolean
96 97 98 99 100 101 |
# File 'lib/cql/io/connection.rb', line 96 def writable? empty_buffer = @lock.synchronize do @write_buffer.empty? end !(closed? || empty_buffer) end |
#write(bytes = nil) {|buffer| ... } ⇒ Object
Write bytes to the socket.
You can either pass in bytes (as a string or as a ‘ByteBuffer`), or you can use the block form of this method to get access to the connection’s internal buffer.
155 156 157 158 159 160 161 162 163 164 |
# File 'lib/cql/io/connection.rb', line 155 def write(bytes=nil) @lock.synchronize do if block_given? yield @write_buffer elsif bytes @write_buffer.append(bytes) end end @unblocker.unblock! end |