Class: Kafka::Protocol::Decoder
- Inherits:
-
Object
- Object
- Kafka::Protocol::Decoder
- Defined in:
- lib/kafka/protocol/decoder.rb
Overview
A decoder wraps an IO object, making it easy to read specific data types from it. The Kafka protocol is not self-describing, so a client must call these methods in just the right order for things to work.
Constant Summary collapse
- VARINT_MASK =
0b10000000
Class Method Summary collapse
Instance Method Summary collapse
-
#array(&block) ⇒ Array
Decodes an array from the IO object.
-
#boolean ⇒ Boolean
Decodes an 8-bit boolean from the IO object.
-
#bytes ⇒ String
Decodes a list of bytes from the IO object.
- #eof? ⇒ Boolean
-
#initialize(io) ⇒ Decoder
constructor
Initializes a new decoder.
-
#int16 ⇒ Integer
Decodes a 16-bit integer from the IO object.
-
#int32 ⇒ Integer
Decodes a 32-bit integer from the IO object.
-
#int64 ⇒ Integer
Decodes a 64-bit integer from the IO object.
-
#int8 ⇒ Integer
Decodes an 8-bit integer from the IO object.
-
#peek(offset, length) ⇒ Integer
Get some next bytes without touching the current io offset.
-
#read(number_of_bytes) ⇒ String
Reads the specified number of bytes from the IO object, returning them as a String.
-
#string ⇒ String
Decodes a string from the IO object.
-
#varint ⇒ Integer
Read an integer under varints serializing from the IO object.
-
#varint_array(&block) ⇒ Array
Decodes an array from the IO object.
-
#varint_bytes ⇒ String
Decodes a list of bytes from the IO object.
-
#varint_string ⇒ String
Decodes a string from the IO object, the size is in varint format.
Constructor Details
#initialize(io) ⇒ Decoder
Initializes a new decoder.
18 19 20 |
# File 'lib/kafka/protocol/decoder.rb', line 18 def initialize(io) @io = io end |
Class Method Details
.from_string(str) ⇒ Object
11 12 13 |
# File 'lib/kafka/protocol/decoder.rb', line 11 def self.from_string(str) new(StringIO.new(str)) end |
Instance Method Details
#array(&block) ⇒ Array
Decodes an array from the IO object.
The provided block will be called once for each item in the array. It is the responsibility of the block to decode the proper type in the block, since there's no information that allows the type to be inferred automatically.
79 80 81 82 |
# File 'lib/kafka/protocol/decoder.rb', line 79 def array(&block) size = int32 size.times.map(&block) end |
#boolean ⇒ Boolean
Decodes an 8-bit boolean from the IO object.
39 40 41 |
# File 'lib/kafka/protocol/decoder.rb', line 39 def boolean read(1) == 0x1 end |
#bytes ⇒ String
Decodes a list of bytes from the IO object.
138 139 140 141 142 143 144 145 146 |
# File 'lib/kafka/protocol/decoder.rb', line 138 def bytes size = int32 if size == -1 nil else read(size) end end |
#eof? ⇒ Boolean
22 23 24 |
# File 'lib/kafka/protocol/decoder.rb', line 22 def eof? @io.eof? end |
#int16 ⇒ Integer
Decodes a 16-bit integer from the IO object.
53 54 55 |
# File 'lib/kafka/protocol/decoder.rb', line 53 def int16 read(2).unpack("s>").first end |
#int32 ⇒ Integer
Decodes a 32-bit integer from the IO object.
60 61 62 |
# File 'lib/kafka/protocol/decoder.rb', line 60 def int32 read(4).unpack("l>").first end |
#int64 ⇒ Integer
Decodes a 64-bit integer from the IO object.
67 68 69 |
# File 'lib/kafka/protocol/decoder.rb', line 67 def int64 read(8).unpack("q>").first end |
#int8 ⇒ Integer
Decodes an 8-bit integer from the IO object.
46 47 48 |
# File 'lib/kafka/protocol/decoder.rb', line 46 def int8 read(1).unpack("C").first end |
#peek(offset, length) ⇒ Integer
Get some next bytes without touching the current io offset
29 30 31 32 33 34 |
# File 'lib/kafka/protocol/decoder.rb', line 29 def peek(offset, length) data = @io.read(offset + length) return [] if data.nil? @io.ungetc(data) data.bytes[offset, offset + length] || [] end |
#read(number_of_bytes) ⇒ String
Reads the specified number of bytes from the IO object, returning them as a String.
165 166 167 168 169 170 171 172 173 174 175 |
# File 'lib/kafka/protocol/decoder.rb', line 165 def read(number_of_bytes) return "" if number_of_bytes == 0 data = @io.read(number_of_bytes) or raise EOFError # If the `read` call returned less data than expected we should not # proceed. raise EOFError if data.size != number_of_bytes data end |
#string ⇒ String
Decodes a string from the IO object.
96 97 98 99 100 101 102 103 104 |
# File 'lib/kafka/protocol/decoder.rb', line 96 def string size = int16 if size == -1 nil else read(size) end end |
#varint ⇒ Integer
Read an integer under varints serializing from the IO object. https://developers.google.com/protocol-buffers/docs/encoding#varints
123 124 125 126 127 128 129 130 131 132 133 |
# File 'lib/kafka/protocol/decoder.rb', line 123 def varint group = 0 data = 0 loop do chunk = int8 data |= (chunk & (~VARINT_MASK)) << group group += 7 break if (chunk & VARINT_MASK) == 0 end data & 0b1 != 0 ? ~(data >> 1) : (data >> 1) end |
#varint_array(&block) ⇒ Array
Decodes an array from the IO object. Just like #array except the size is in varint format
88 89 90 91 |
# File 'lib/kafka/protocol/decoder.rb', line 88 def varint_array(&block) size = varint size.times.map(&block) end |
#varint_bytes ⇒ String
Decodes a list of bytes from the IO object. The size is in varint format
151 152 153 154 155 156 157 158 159 |
# File 'lib/kafka/protocol/decoder.rb', line 151 def varint_bytes size = varint if size == -1 nil else read(size) end end |
#varint_string ⇒ String
Decodes a string from the IO object, the size is in varint format
109 110 111 112 113 114 115 116 117 |
# File 'lib/kafka/protocol/decoder.rb', line 109 def varint_string size = varint if size == -1 nil else read(size) end end |