Class: Kafka::Protocol::Encoder

Inherits:
Object
  • Object
show all
Defined in:
lib/kafka/protocol/encoder.rb

Overview

An encoder wraps an IO object, making it easy to write specific data types to it.

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(io) ⇒ Encoder

Initializes a new encoder.

Parameters:

  • io (IO)

    an object that acts as an IO.



13
14
15
16
# File 'lib/kafka/protocol/encoder.rb', line 13

def initialize(io)
  @io = io
  @io.set_encoding(Encoding::BINARY)
end

Class Method Details

.encode_with(object) ⇒ String

Encodes an object into a new buffer.

Parameters:

  • object (#encode)

    the object that will encode itself.

Returns:

  • (String)

    the encoded data.



174
175
176
177
178
179
180
181
# File 'lib/kafka/protocol/encoder.rb', line 174

def self.encode_with(object)
  buffer = StringIO.new
  encoder = new(buffer)

  object.encode(encoder)

  buffer.string
end

Instance Method Details

#write(bytes) ⇒ nil

Writes bytes directly to the IO object.

Parameters:

  • bytes (String)

Returns:

  • (nil)


22
23
24
25
26
# File 'lib/kafka/protocol/encoder.rb', line 22

def write(bytes)
  @io.write(bytes)

  nil
end

#write_array(array, &block) ⇒ nil

Writes an array to the IO object.

Each item in the specified array will be yielded to the provided block; it’s the responsibility of the block to write those items using the encoder.

Parameters:

  • array (Array)

Returns:

  • (nil)


76
77
78
79
80
81
82
83
84
# File 'lib/kafka/protocol/encoder.rb', line 76

def write_array(array, &block)
  if array.nil?
    # An array can be null, which is different from it being empty.
    write_int32(-1)
  else
    write_int32(array.size)
    array.each(&block)
  end
end

#write_boolean(boolean) ⇒ nil

Writes an 8-bit boolean to the IO object.

Parameters:

  • boolean (Boolean)

Returns:

  • (nil)


32
33
34
# File 'lib/kafka/protocol/encoder.rb', line 32

def write_boolean(boolean)
  boolean ? write_int8(1) : write_int8(0)
end

#write_bytes(bytes) ⇒ nil

Writes a byte string to the IO object.

Parameters:

  • bytes (String)

Returns:

  • (nil)


148
149
150
151
152
153
154
155
# File 'lib/kafka/protocol/encoder.rb', line 148

def write_bytes(bytes)
  if bytes.nil?
    write_int32(-1)
  else
    write_int32(bytes.bytesize)
    write(bytes)
  end
end

#write_int16(int) ⇒ nil

Writes a 16-bit integer to the IO object.

Parameters:

  • int (Integer)

Returns:

  • (nil)


48
49
50
# File 'lib/kafka/protocol/encoder.rb', line 48

def write_int16(int)
  write([int].pack("s>"))
end

#write_int32(int) ⇒ nil

Writes a 32-bit integer to the IO object.

Parameters:

  • int (Integer)

Returns:

  • (nil)


56
57
58
# File 'lib/kafka/protocol/encoder.rb', line 56

def write_int32(int)
  write([int].pack("l>"))
end

#write_int64(int) ⇒ nil

Writes a 64-bit integer to the IO object.

Parameters:

  • int (Integer)

Returns:

  • (nil)


64
65
66
# File 'lib/kafka/protocol/encoder.rb', line 64

def write_int64(int)
  write([int].pack("q>"))
end

#write_int8(int) ⇒ nil

Writes an 8-bit integer to the IO object.

Parameters:

  • int (Integer)

Returns:

  • (nil)


40
41
42
# File 'lib/kafka/protocol/encoder.rb', line 40

def write_int8(int)
  write([int].pack("C"))
end

#write_string(string) ⇒ nil

Writes a string to the IO object.

Parameters:

  • string (String)

Returns:

  • (nil)


104
105
106
107
108
109
110
111
# File 'lib/kafka/protocol/encoder.rb', line 104

def write_string(string)
  if string.nil?
    write_int16(-1)
  else
    write_int16(string.bytesize)
    write(string)
  end
end

#write_varint(int) ⇒ nil

Writes an integer under varints serializing to the IO object. developers.google.com/protocol-buffers/docs/encoding#varints

Parameters:

  • string (Integer)

Returns:

  • (nil)


131
132
133
134
135
136
137
138
139
140
141
142
# File 'lib/kafka/protocol/encoder.rb', line 131

def write_varint(int)
  int = int << 1
  int = ~int | 1 if int < 0

  chunks = []
  while int >> 7 != 0
    chunks << (int & 0x7f | 0x80)
    int >>= 7
  end
  chunks << int
  write(chunks.pack("C*"))
end

#write_varint_array(array, &block) ⇒ nil

Writes an array to the IO object. Just like #write_array, unless the size is under varint format

Parameters:

  • array (Array)

Returns:

  • (nil)


91
92
93
94
95
96
97
98
# File 'lib/kafka/protocol/encoder.rb', line 91

def write_varint_array(array, &block)
  if array.nil?
    write_varint(-1)
  else
    write_varint(array.size)
    array.each(&block)
  end
end

#write_varint_bytes(bytes) ⇒ nil

Writes a byte string to the IO object, the size is under varint format

Parameters:

  • bytes (String)

Returns:

  • (nil)


161
162
163
164
165
166
167
168
# File 'lib/kafka/protocol/encoder.rb', line 161

def write_varint_bytes(bytes)
  if bytes.nil?
    write_varint(-1)
  else
    write_varint(bytes.bytesize)
    write(bytes)
  end
end

#write_varint_string(string) ⇒ nil

Writes a string to the IO object, the size is under varint format

Parameters:

  • string (String)

Returns:

  • (nil)


117
118
119
120
121
122
123
124
# File 'lib/kafka/protocol/encoder.rb', line 117

def write_varint_string(string)
  if string.nil?
    write_varint(-1)
  else
    write_varint(string.bytesize)
    write(string)
  end
end