Module: ZooKeeper::Protocol
- Includes:
- Slf4r::Logger
- Included in:
- EventMachine::ClientConn, RubyIO::Connection
- Defined in:
- lib/zkruby/protocol.rb
Overview
Raw object protocol, very similar to EM’s native ObjectProtocol Expects:
#receive_data and #send_records to be invoked
#receive_records and #send_data to be implemented
Constant Summary collapse
- MIN_PACKET =
TODO Work out what the min packet size really is
5
Instance Method Summary collapse
-
#receive_data(data) ⇒ Object
:nodoc:.
- #receive_records(packet_io) ⇒ Object
- #send_records(*records) ⇒ Object
Instance Method Details
#receive_data(data) ⇒ Object
:nodoc:
16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 |
# File 'lib/zkruby/protocol.rb', line 16 def receive_data data # :nodoc: @buffer ||= StringIO.new() @buffer.seek(0, IO::SEEK_END) @buffer << data @buffer.rewind logger.debug { "Received #{data.length} bytes: buffer length = #{@buffer.length} pos = #{@buffer.pos}" } loop do if @buffer.length - @buffer.pos > MIN_PACKET packet_size = @buffer.read(4).unpack("N").first if (@buffer.length - @buffer.pos >= packet_size) expected_pos = @buffer.pos + packet_size # We just pass the buffer around and expect packet_size to be consumed receive_records(@buffer) if (@buffer.pos != expected_pos) #this can happen during disconnection with left over packets #the connection is dying anyway leftover = @buffer.read(packet_size).unpack("H*")[0] raise ProtocolError, "Records not consumed #{leftover}" end logger.debug { "Consumed packet #{packet_size}. Buffer pos=#{@buffer.pos}, length=#{@buffer.length}" } next else # found the last partial packet @buffer.seek(-4, IO::SEEK_CUR) logger.debug { "Buffer contains #{@buffer.length} of #{packet_size} packet" } end end break end # reset the buffer @buffer = StringIO.new(@buffer.read()) if @buffer.pos > 0 end |
#receive_records(packet_io) ⇒ Object
50 51 52 53 |
# File 'lib/zkruby/protocol.rb', line 50 def receive_records(packet_io) #stub #we don't unpack records here because we don't know what kind of records they are! end |
#send_records(*records) ⇒ Object
55 56 57 58 59 60 61 |
# File 'lib/zkruby/protocol.rb', line 55 def send_records(*records) length = 0 bindata = records.collect { |r| s = r.to_binary_s; length += s.length; s } send_data([length].pack("N")) bindata.each { |b| send_data(b) } logger.debug { "Sent #{length} byte packet containing #{records.length} records" } end |