Module: ZooKeeper::Protocol

Includes:
Slf4r::Logger
Included in:
EventMachine::ClientConn, RubyIO::Connection
Defined in:
lib/zkruby/protocol.rb

Overview

Raw object protocol, very similar to EM’s native ObjectProtocol Expects:

#receive_data and #send_records to be invoked
#receive_records and #send_data to be implemented

Constant Summary collapse

MIN_PACKET =

TODO Work out what the min packet size really is

5

Instance Method Summary collapse

Instance Method Details

#receive_data(data) ⇒ Object

:nodoc:



16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
# File 'lib/zkruby/protocol.rb', line 16

def receive_data data # :nodoc:

  @buffer ||= StringIO.new()
  @buffer.seek(0, IO::SEEK_END)
  @buffer << data
  @buffer.rewind
  logger.debug { "Received #{data.length} bytes: buffer length = #{@buffer.length} pos = #{@buffer.pos}" }
  loop do
    if @buffer.length - @buffer.pos > MIN_PACKET 
        packet_size = @buffer.read(4).unpack("N").first
        if (@buffer.length - @buffer.pos >= packet_size)
            expected_pos = @buffer.pos + packet_size
            # We just pass the buffer around and expect packet_size to be consumed
            receive_records(@buffer)
            if (@buffer.pos != expected_pos)
                #this can happen during disconnection with left over packets
                #the connection is dying anyway
                leftover = @buffer.read(packet_size).unpack("H*")[0]
                raise ProtocolError, "Records not consumed #{leftover}"
            end
            logger.debug { "Consumed packet #{packet_size}. Buffer pos=#{@buffer.pos}, length=#{@buffer.length}" }
            next
        else
            # found the last partial packet
            @buffer.seek(-4, IO::SEEK_CUR)
            logger.debug { "Buffer contains #{@buffer.length} of #{packet_size} packet" }
        end
    end
    break
  end
  # reset the buffer
  @buffer = StringIO.new(@buffer.read()) if @buffer.pos > 0
end

#receive_records(packet_io) ⇒ Object



50
51
52
53
# File 'lib/zkruby/protocol.rb', line 50

def receive_records(packet_io)
    #stub
    #we don't unpack records here because we don't know what kind of records they are!
end

#send_records(*records) ⇒ Object



55
56
57
58
59
60
61
# File 'lib/zkruby/protocol.rb', line 55

def send_records(*records)
    length = 0
    bindata = records.collect { |r| s = r.to_binary_s; length += s.length; s }
    send_data([length].pack("N"))
    bindata.each { |b| send_data(b) }
    logger.debug { "Sent #{length} byte packet containing  #{records.length} records" }
end