Class: ZooKeeper::RubyIO::Connection
- Inherits:
-
Object
- Object
- ZooKeeper::RubyIO::Connection
- Includes:
- Slf4r::Logger, Socket::Constants, Protocol
- Defined in:
- lib/zkruby/rubyio.rb
Constant Summary
Constants included from Protocol
Instance Method Summary collapse
- #disconnect ⇒ Object
-
#initialize(host, port, timeout, session) ⇒ Connection
constructor
A new instance of Connection.
- #read_loop ⇒ Object
-
#receive_records(packet_io) ⇒ Object
Protocol requirement.
-
#send_data(data) ⇒ Object
This is called from random client threads, but only within a @session.synchronized() block.
-
#write_loop(socket) ⇒ Object
Since this runs in its very own thread we can use boring blocking IO.
Methods included from Protocol
Constructor Details
#initialize(host, port, timeout, session) ⇒ Connection
Returns a new instance of Connection.
33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 |
# File 'lib/zkruby/rubyio.rb', line 33 def initialize(host,port,timeout,session) @session = session @write_queue = Queue.new() # JRuby cannot do non-blocking connects, which means there is # no way to properly implement the connection-timeout # See http://jira.codehaus.org/browse/JRUBY-5165 # In any case this should be encapsulated in TCPSocket.open(host,port,timeout) if RUBY_PLATFORM == "java" begin sock = TCPSocket.new(host,port.to_i) rescue Errno::ECONNREFUSED logger.warn("TCP Connection refused to #{host}:#{port}") sock = nil end else addr = Socket.getaddrinfo(host, nil) sock = Socket.new(Socket.const_get(addr[0][0]), Socket::SOCK_STREAM, 0) sock.setsockopt(SOL_SOCKET, SO_LINGER, [0,-1].pack("ii")) sock.setsockopt(SOL_TCP, TCP_NODELAY,[0].pack("i_")) sockaddr = Socket.pack_sockaddr_in(port, addr[0][3]) begin sock.connect_nonblock(sockaddr) rescue Errno::EINPROGRESS resp = IO.select(nil, [sock], nil, timeout) begin sock.connect_nonblock(sockaddr) rescue Errno::ECONNREFUSED logger.warn("Connection refused to #{ host }:#{ port }") sock = nil rescue Errno::EISCONN end end end @socket = sock Thread.new(sock) { |sock| write_loop(sock) } if sock end |
Instance Method Details
#disconnect ⇒ Object
129 130 131 132 133 134 135 136 |
# File 'lib/zkruby/rubyio.rb', line 129 def disconnect() socket = @socket @socket = nil socket.close if socket rescue Exception => ex #oh well logger.debug("Exception closing socket",ex) end |
#read_loop ⇒ Object
97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 |
# File 'lib/zkruby/rubyio.rb', line 97 def read_loop() socket = @socket ping = 0 while socket # effectively forever begin data = socket.read_nonblock(1024) logger.debug { "Received (#{data.length})" + data.unpack("H*")[0] } receive_data(data) ping = 0 rescue IO::WaitReadable select_result = IO.select([socket],[],[],@session.ping_interval) unless select_result ping += 1 # two timeouts in a row mean we need to send a ping case ping when 1 ; @session.synchronize { @session.ping() } when 2 logger.debug{"No response to ping in #{@session.ping_interval}*2"} break end end rescue EOFError logger.debug { "EOF reading from socket" } break rescue Exception => ex logger.warn( "#{ex.class} exception in readloop",ex ) break end end disconnect() end |
#receive_records(packet_io) ⇒ Object
Protocol requirement
139 140 141 |
# File 'lib/zkruby/rubyio.rb', line 139 def receive_records(packet_io) @session.synchronize { @session.receive_records(packet_io) } end |
#send_data(data) ⇒ Object
This is called from random client threads, but only within a @session.synchronized() block
73 74 75 |
# File 'lib/zkruby/rubyio.rb', line 73 def send_data(data) @write_queue.push(data) end |
#write_loop(socket) ⇒ Object
Since this runs in its very own thread we can use boring blocking IO
79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 |
# File 'lib/zkruby/rubyio.rb', line 79 def write_loop(socket) Thread.current[:name] = "ZooKeeper::RubyIO::WriteLoop" begin while socket data = @write_queue.pop() if socket.write(data) != data.length() #TODO - will this really ever happen logger.warn("Incomplete write!") end logger.debug { "Sending: " + data.unpack("H*")[0] } end rescue Exception => ex logger.warn("Exception in write loop",ex) disconnect() end end |