Class: Kafka::Protocol::Decoder

Inherits:
Object
  • Object
show all
Defined in:
lib/kafka/protocol/decoder.rb

Overview

A decoder wraps an IO object, making it easy to read specific data types from it. The Kafka protocol is not self-describing, so a client must call these methods in just the right order for things to work.

Constant Summary collapse

VARINT_MASK =
0b10000000

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(io) ⇒ Decoder

Initializes a new decoder.

Parameters:

  • io (IO)

    an object that acts as an IO.



18
19
20
# File 'lib/kafka/protocol/decoder.rb', line 18

def initialize(io)
  @io = io
end

Class Method Details

.from_string(str) ⇒ Object



11
12
13
# File 'lib/kafka/protocol/decoder.rb', line 11

def self.from_string(str)
  new(StringIO.new(str))
end

Instance Method Details

#array(&block) ⇒ Array

Decodes an array from the IO object.

The provided block will be called once for each item in the array. It is the responsibility of the block to decode the proper type in the block, since there's no information that allows the type to be inferred automatically.

Returns:

  • (Array)


79
80
81
82
# File 'lib/kafka/protocol/decoder.rb', line 79

def array(&block)
  size = int32
  size.times.map(&block)
end

#booleanBoolean

Decodes an 8-bit boolean from the IO object.

Returns:

  • (Boolean)


39
40
41
# File 'lib/kafka/protocol/decoder.rb', line 39

def boolean
  read(1) == 0x1
end

#bytesString

Decodes a list of bytes from the IO object.

Returns:

  • (String)


138
139
140
141
142
143
144
145
146
# File 'lib/kafka/protocol/decoder.rb', line 138

def bytes
  size = int32

  if size == -1
    nil
  else
    read(size)
  end
end

#eof?Boolean

Returns:

  • (Boolean)


22
23
24
# File 'lib/kafka/protocol/decoder.rb', line 22

def eof?
  @io.eof?
end

#int16Integer

Decodes a 16-bit integer from the IO object.

Returns:

  • (Integer)


53
54
55
# File 'lib/kafka/protocol/decoder.rb', line 53

def int16
  read(2).unpack("s>").first
end

#int32Integer

Decodes a 32-bit integer from the IO object.

Returns:

  • (Integer)


60
61
62
# File 'lib/kafka/protocol/decoder.rb', line 60

def int32
  read(4).unpack("l>").first
end

#int64Integer

Decodes a 64-bit integer from the IO object.

Returns:

  • (Integer)


67
68
69
# File 'lib/kafka/protocol/decoder.rb', line 67

def int64
  read(8).unpack("q>").first
end

#int8Integer

Decodes an 8-bit integer from the IO object.

Returns:

  • (Integer)


46
47
48
# File 'lib/kafka/protocol/decoder.rb', line 46

def int8
  read(1).unpack("C").first
end

#peek(offset, length) ⇒ Integer

Get some next bytes without touching the current io offset

Returns:

  • (Integer)


29
30
31
32
33
34
# File 'lib/kafka/protocol/decoder.rb', line 29

def peek(offset, length)
  data = @io.read(offset + length)
  return [] if data.nil?
  @io.ungetc(data)
  data.bytes[offset, offset + length] || []
end

#read(number_of_bytes) ⇒ String

Reads the specified number of bytes from the IO object, returning them as a String.

Returns:

  • (String)

Raises:

  • (EOFError)


165
166
167
168
169
170
171
172
173
174
175
# File 'lib/kafka/protocol/decoder.rb', line 165

def read(number_of_bytes)
  return "" if number_of_bytes == 0

  data = @io.read(number_of_bytes) or raise EOFError

  # If the `read` call returned less data than expected we should not
  # proceed.
  raise EOFError if data.size != number_of_bytes

  data
end

#stringString

Decodes a string from the IO object.

Returns:

  • (String)


96
97
98
99
100
101
102
103
104
# File 'lib/kafka/protocol/decoder.rb', line 96

def string
  size = int16

  if size == -1
    nil
  else
    read(size)
  end
end

#varintInteger

Read an integer under varints serializing from the IO object. https://developers.google.com/protocol-buffers/docs/encoding#varints

Returns:

  • (Integer)


123
124
125
126
127
128
129
130
131
132
133
# File 'lib/kafka/protocol/decoder.rb', line 123

def varint
  group = 0
  data = 0
  loop do
    chunk = int8
    data |= (chunk & (~VARINT_MASK)) << group
    group += 7
    break if (chunk & VARINT_MASK) == 0
  end
  data & 0b1 != 0 ? ~(data >> 1) : (data >> 1)
end

#varint_array(&block) ⇒ Array

Decodes an array from the IO object. Just like #array except the size is in varint format

Returns:

  • (Array)


88
89
90
91
# File 'lib/kafka/protocol/decoder.rb', line 88

def varint_array(&block)
  size = varint
  size.times.map(&block)
end

#varint_bytesString

Decodes a list of bytes from the IO object. The size is in varint format

Returns:

  • (String)


151
152
153
154
155
156
157
158
159
# File 'lib/kafka/protocol/decoder.rb', line 151

def varint_bytes
  size =  varint

  if size == -1
    nil
  else
    read(size)
  end
end

#varint_stringString

Decodes a string from the IO object, the size is in varint format

Returns:

  • (String)


109
110
111
112
113
114
115
116
117
# File 'lib/kafka/protocol/decoder.rb', line 109

def varint_string
  size = varint

  if size == -1
    nil
  else
    read(size)
  end
end