Class: Cql::Io::Connection

Inherits:
Object
  • Object
show all
Defined in:
lib/cql/io/connection.rb

Overview

A wrapper around a socket. Handles connecting to the remote host, reading from and writing to the socket.

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(host, port, connection_timeout, unblocker, clock, socket_impl = Socket) ⇒ Connection

Returns a new instance of Connection.



16
17
18
19
20
21
22
23
24
25
26
27
# File 'lib/cql/io/connection.rb', line 16

def initialize(host, port, connection_timeout, unblocker, clock, socket_impl=Socket)
  @host = host
  @port = port
  @connection_timeout = connection_timeout
  @unblocker = unblocker
  @clock = clock
  @socket_impl = socket_impl
  @lock = Mutex.new
  @connected = false
  @write_buffer = ByteBuffer.new
  @connected_promise = Promise.new
end

Instance Attribute Details

#connection_timeoutObject (readonly)

Returns the value of attribute connection_timeout.



13
14
15
# File 'lib/cql/io/connection.rb', line 13

def connection_timeout
  @connection_timeout
end

#hostObject (readonly)

Returns the value of attribute host.



13
14
15
# File 'lib/cql/io/connection.rb', line 13

def host
  @host
end

#portObject (readonly)

Returns the value of attribute port.



13
14
15
# File 'lib/cql/io/connection.rb', line 13

def port
  @port
end

Instance Method Details

#close(cause = nil) ⇒ Object

Closes the connection



69
70
71
72
73
74
75
76
77
78
# File 'lib/cql/io/connection.rb', line 69

def close(cause=nil)
  return false unless @io
  begin
    @io.close
  rescue SystemCallError, IOError
    # nothing to do, the socket was most likely already closed
  end
  closed!(cause)
  true
end

#closed?Boolean

Returns true if the connection is closed

Returns:

  • (Boolean)


91
92
93
# File 'lib/cql/io/connection.rb', line 91

def closed?
  @io.nil?
end

#connectObject



30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
# File 'lib/cql/io/connection.rb', line 30

def connect
  begin
    unless @addrinfos
      @connection_started_at = @clock.now
      @addrinfos = @socket_impl.getaddrinfo(@host, @port, nil, Socket::SOCK_STREAM)
    end
    unless @io
      _, port, _, ip, address_family, socket_type = @addrinfos.shift
      @sockaddr = @socket_impl.sockaddr_in(port, ip)
      @io = @socket_impl.new(address_family, socket_type, 0)
    end
    unless connected?
      @io.connect_nonblock(@sockaddr)
      @connected = true
      @connected_promise.fulfill(self)
    end
  rescue Errno::EISCONN
    @connected = true
    @connected_promise.fulfill(self)
  rescue Errno::EINPROGRESS, Errno::EALREADY
    if @clock.now - @connection_started_at > @connection_timeout
      close(ConnectionTimeoutError.new("Could not connect to #{@host}:#{@port} within #{@connection_timeout}s"))
    end
  rescue Errno::EINVAL => e
    if @addrinfos.empty?
      close(e)
    else
      @io = nil
      retry
    end
  rescue SystemCallError => e
    close(e)
  rescue SocketError => e
    close(e) || closed!(e)
  end
  @connected_promise.future
end

#connected?Boolean

Returns true if the connection is connected

Returns:

  • (Boolean)


86
87
88
# File 'lib/cql/io/connection.rb', line 86

def connected?
  @connected
end

#connecting?Boolean

Returns:

  • (Boolean)


81
82
83
# File 'lib/cql/io/connection.rb', line 81

def connecting?
  !(closed? || connected?)
end

#flushObject



167
168
169
170
171
172
173
174
175
176
177
# File 'lib/cql/io/connection.rb', line 167

def flush
  if writable?
    @lock.synchronize do
      s = @write_buffer.cheap_peek.dup
      bytes_written = @io.write_nonblock(@write_buffer.cheap_peek)
      @write_buffer.discard(bytes_written)
    end
  end
rescue => e
  close(e)
end

#on_closed {|error, nil| ... } ⇒ Object

Register to receive a notification when the socket is closed, both for expected and unexpected reasons.

You shoud only call this method in your protocol handler constructor.

Only one callback can be registered, if you register multiple times only the last one will receive notifications. This is not meant as a general event system, it’s just for protocol handlers to be notified of the connection closing. If you want multiple listeners you need to implement that yourself in your protocol handler.

Errors raised by the callback will be ignored.

Yields:

  • (error, nil)

    the error that caused the socket to close, or nil if the socket closed with #close



142
143
144
# File 'lib/cql/io/connection.rb', line 142

def on_closed(&listener)
  @closed_listener = listener
end

#on_data {|String| ... } ⇒ Object

Register to receive notifications when new data is read from the socket.

You should only call this method in your protocol handler constructor.

Only one callback can be registered, if you register multiple times only the last one will receive notifications. This is not meant as a general event system, it’s just for protocol handlers to receive data from their connection. If you want multiple listeners you need to implement that yourself in your protocol handler.

It is very important that you don’t do any heavy lifting in the callback since it is called from the IO reactor thread, and as long as the callback is working the reactor can’t handle any IO and no other callbacks can be called.

Errors raised by the callback will be ignored.

Yields:

  • (String)

    the new data



122
123
124
# File 'lib/cql/io/connection.rb', line 122

def on_data(&listener)
  @data_listener = listener
end

#readObject



180
181
182
183
184
185
# File 'lib/cql/io/connection.rb', line 180

def read
  new_data = @io.read_nonblock(2**16)
  @data_listener.call(new_data) if @data_listener
rescue => e
  close(e)
end

#to_ioObject



188
189
190
# File 'lib/cql/io/connection.rb', line 188

def to_io
  @io
end

#to_sObject



192
193
194
195
196
197
198
199
200
201
202
# File 'lib/cql/io/connection.rb', line 192

def to_s
  state = 'inconsistent'
  if connected?
    state = 'connected'
  elsif connecting?
    state = 'connecting'
  elsif closed?
    state = 'closed'
  end
  %(#<#{self.class.name} #{state} #{@host}:#{@port}>)
end

#writable?Boolean

Returns:

  • (Boolean)


96
97
98
99
100
101
# File 'lib/cql/io/connection.rb', line 96

def writable?
  empty_buffer = @lock.synchronize do
    @write_buffer.empty?
  end
  !(closed? || empty_buffer)
end

#write(bytes = nil) {|buffer| ... } ⇒ Object

Write bytes to the socket.

You can either pass in bytes (as a string or as a ‘ByteBuffer`), or you can use the block form of this method to get access to the connection’s internal buffer.

Parameters:

  • bytes (String, Cql::ByteBuffer) (defaults to: nil)

    the data to write to the socket

Yield Parameters:



155
156
157
158
159
160
161
162
163
164
# File 'lib/cql/io/connection.rb', line 155

def write(bytes=nil)
  @lock.synchronize do
    if block_given?
      yield @write_buffer
    elsif bytes
      @write_buffer.append(bytes)
    end
  end
  @unblocker.unblock!
end