Class: Kafka::Protocol::Decoder
- Inherits:
-
Object
- Object
- Kafka::Protocol::Decoder
- Defined in:
- lib/kafka/protocol/decoder.rb
Overview
A decoder wraps an IO object, making it easy to read specific data types from it. The Kafka protocol is not self-describing, so a client must call these methods in just the right order for things to work.
Class Method Summary collapse
Instance Method Summary collapse
-
#array(&block) ⇒ Array
Decodes an array from the IO object.
-
#boolean ⇒ Boolean
Decodes an 8-bit boolean from the IO object.
-
#bytes ⇒ String
Decodes a list of bytes from the IO object.
- #eof? ⇒ Boolean
-
#initialize(io) ⇒ Decoder
constructor
Initializes a new decoder.
-
#int16 ⇒ Integer
Decodes a 16-bit integer from the IO object.
-
#int32 ⇒ Integer
Decodes a 32-bit integer from the IO object.
-
#int64 ⇒ Integer
Decodes a 64-bit integer from the IO object.
-
#int8 ⇒ Integer
Decodes an 8-bit integer from the IO object.
-
#peek(offset, length) ⇒ Integer
Get some next bytes without touching the current io offset.
-
#read(number_of_bytes) ⇒ String
Reads the specified number of bytes from the IO object, returning them as a String.
-
#string ⇒ String
Decodes a string from the IO object.
-
#varint ⇒ Integer
Read an integer under varints serializing from the IO object.
-
#varint_array(&block) ⇒ Array
Decodes an array from the IO object.
-
#varint_bytes ⇒ String
Decodes a list of bytes from the IO object.
-
#varint_string ⇒ String
Decodes a string from the IO object, the size is in varint format.
Constructor Details
#initialize(io) ⇒ Decoder
Initializes a new decoder.
16 17 18 |
# File 'lib/kafka/protocol/decoder.rb', line 16 def initialize(io) @io = io end |
Class Method Details
.from_string(str) ⇒ Object
9 10 11 |
# File 'lib/kafka/protocol/decoder.rb', line 9 def self.from_string(str) new(StringIO.new(str)) end |
Instance Method Details
#array(&block) ⇒ Array
Decodes an array from the IO object.
The provided block will be called once for each item in the array. It is the responsibility of the block to decode the proper type in the block, since there's no information that allows the type to be inferred automatically.
77 78 79 80 |
# File 'lib/kafka/protocol/decoder.rb', line 77 def array(&block) size = int32 size.times.map(&block) end |
#boolean ⇒ Boolean
Decodes an 8-bit boolean from the IO object.
37 38 39 |
# File 'lib/kafka/protocol/decoder.rb', line 37 def boolean read(1) == 0x1 end |
#bytes ⇒ String
Decodes a list of bytes from the IO object.
135 136 137 138 139 140 141 142 143 |
# File 'lib/kafka/protocol/decoder.rb', line 135 def bytes size = int32 if size == -1 nil else read(size) end end |
#eof? ⇒ Boolean
20 21 22 |
# File 'lib/kafka/protocol/decoder.rb', line 20 def eof? @io.eof? end |
#int16 ⇒ Integer
Decodes a 16-bit integer from the IO object.
51 52 53 |
# File 'lib/kafka/protocol/decoder.rb', line 51 def int16 read(2).unpack("s>").first end |
#int32 ⇒ Integer
Decodes a 32-bit integer from the IO object.
58 59 60 |
# File 'lib/kafka/protocol/decoder.rb', line 58 def int32 read(4).unpack("l>").first end |
#int64 ⇒ Integer
Decodes a 64-bit integer from the IO object.
65 66 67 |
# File 'lib/kafka/protocol/decoder.rb', line 65 def int64 read(8).unpack("q>").first end |
#int8 ⇒ Integer
Decodes an 8-bit integer from the IO object.
44 45 46 |
# File 'lib/kafka/protocol/decoder.rb', line 44 def int8 read(1).unpack("C").first end |
#peek(offset, length) ⇒ Integer
Get some next bytes without touching the current io offset
27 28 29 30 31 32 |
# File 'lib/kafka/protocol/decoder.rb', line 27 def peek(offset, length) data = @io.read(offset + length) return [] if data.nil? @io.ungetc(data) data.bytes[offset, offset + length] || [] end |
#read(number_of_bytes) ⇒ String
Reads the specified number of bytes from the IO object, returning them as a String.
162 163 164 165 166 167 168 169 170 171 172 |
# File 'lib/kafka/protocol/decoder.rb', line 162 def read(number_of_bytes) return "" if number_of_bytes == 0 data = @io.read(number_of_bytes) or raise EOFError # If the `read` call returned less data than expected we should not # proceed. raise EOFError if data.size != number_of_bytes data end |
#string ⇒ String
Decodes a string from the IO object.
94 95 96 97 98 99 100 101 102 |
# File 'lib/kafka/protocol/decoder.rb', line 94 def string size = int16 if size == -1 nil else read(size) end end |
#varint ⇒ Integer
Read an integer under varints serializing from the IO object. https://developers.google.com/protocol-buffers/docs/encoding#varints
121 122 123 124 125 126 127 128 129 130 |
# File 'lib/kafka/protocol/decoder.rb', line 121 def varint group = 0 data = 0 while (chunk = int8) & 0x80 != 0 data |= (chunk & 0x7f) << group group += 7 end data |= chunk << group data & 0b1 != 0 ? ~(data >> 1) : (data >> 1) end |
#varint_array(&block) ⇒ Array
Decodes an array from the IO object. Just like #array except the size is in varint format
86 87 88 89 |
# File 'lib/kafka/protocol/decoder.rb', line 86 def varint_array(&block) size = varint size.times.map(&block) end |
#varint_bytes ⇒ String
Decodes a list of bytes from the IO object. The size is in varint format
148 149 150 151 152 153 154 155 156 |
# File 'lib/kafka/protocol/decoder.rb', line 148 def varint_bytes size = varint if size == -1 nil else read(size) end end |
#varint_string ⇒ String
Decodes a string from the IO object, the size is in varint format
107 108 109 110 111 112 113 114 115 |
# File 'lib/kafka/protocol/decoder.rb', line 107 def varint_string size = varint if size == -1 nil else read(size) end end |