Class: Kafka::Message
- Inherits:
-
Object
- Object
- Kafka::Message
- Defined in:
- lib/kafka/message.rb
Overview
A message. The format of a message is as follows:
4 byte big-endian int: length of message in bytes (including the rest of
the header, but excluding the length field itself)
1 byte: “magic” identifier (format version number)
If the magic byte == 0, there is one more header field:
4 byte big-endian int: CRC32 checksum of the payload
If the magic byte == 1, there are two more header fields:
1 byte: “attributes” (flags for compression, codec etc) 4 byte big-endian int: CRC32 checksum of the payload
All following bytes are the payload.
Defined Under Namespace
Classes: MessageSet
Constant Summary collapse
- MAGIC_IDENTIFIER_DEFAULT =
0
- MAGIC_IDENTIFIER_COMPRESSION =
1
- NO_COMPRESSION =
0
- GZIP_COMPRESSION =
1
- SNAPPY_COMPRESSION =
2
- BASIC_MESSAGE_HEADER =
'NC'.freeze
- VERSION_0_HEADER =
'N'.freeze
- VERSION_1_HEADER =
'CN'.freeze
- COMPRESSION_CODEC_MASK =
0x03
Instance Attribute Summary collapse
-
#checksum ⇒ Object
Returns the value of attribute checksum.
-
#magic ⇒ Object
Returns the value of attribute magic.
-
#payload ⇒ Object
Returns the value of attribute payload.
Class Method Summary collapse
- .ensure_snappy! ⇒ Object
-
.parse_from(data) ⇒ Object
Takes a byte string containing one or more messages; returns a MessageSet with the messages parsed from the string, and the number of bytes consumed from the string.
Instance Method Summary collapse
- #calculate_checksum ⇒ Object
- #encode(compression = NO_COMPRESSION) ⇒ Object
- #ensure_snappy!(&block) ⇒ Object
-
#initialize(payload = nil, magic = MAGIC_IDENTIFIER_DEFAULT, checksum = nil) ⇒ Message
constructor
A new instance of Message.
- #valid? ⇒ Boolean
Constructor Details
#initialize(payload = nil, magic = MAGIC_IDENTIFIER_DEFAULT, checksum = nil) ⇒ Message
Returns a new instance of Message.
47 48 49 50 51 52 |
# File 'lib/kafka/message.rb', line 47 def initialize(payload = nil, magic = MAGIC_IDENTIFIER_DEFAULT, checksum = nil) self.magic = magic self.payload = payload || "" self.checksum = checksum || self.calculate_checksum @compression = NO_COMPRESSION end |
Instance Attribute Details
#checksum ⇒ Object
Returns the value of attribute checksum.
45 46 47 |
# File 'lib/kafka/message.rb', line 45 def checksum @checksum end |
#magic ⇒ Object
Returns the value of attribute magic.
45 46 47 |
# File 'lib/kafka/message.rb', line 45 def magic @magic end |
#payload ⇒ Object
Returns the value of attribute payload.
45 46 47 |
# File 'lib/kafka/message.rb', line 45 def payload @payload end |
Class Method Details
.ensure_snappy! ⇒ Object
137 138 139 140 141 142 143 |
# File 'lib/kafka/message.rb', line 137 def self.ensure_snappy! if Object.const_defined? "Snappy" yield else fail "Snappy not available!" end end |
.parse_from(data) ⇒ Object
Takes a byte string containing one or more messages; returns a MessageSet with the messages parsed from the string, and the number of bytes consumed from the string.
65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 |
# File 'lib/kafka/message.rb', line 65 def self.parse_from(data) = [] bytes_processed = 0 while bytes_processed <= data.length - 5 # 5 = size of BASIC_MESSAGE_HEADER , magic = data[bytes_processed, 5].unpack(BASIC_MESSAGE_HEADER) break if bytes_processed + + 4 > data.length # message is truncated case magic when MAGIC_IDENTIFIER_DEFAULT # | 0 | 1 | 2 | 3 | 4 | 5 | 6 | 7 | 8 | 9 ... # | | | | # | message_size |magic| checksum | payload ... payload_size = - 5 # 5 = sizeof(magic) + sizeof(checksum) checksum = data[bytes_processed + 5, 4].unpack(VERSION_0_HEADER).shift payload = data[bytes_processed + 9, payload_size] << Kafka::Message.new(payload, magic, checksum) when MAGIC_IDENTIFIER_COMPRESSION # | 0 | 1 | 2 | 3 | 4 | 5 | 6 | 7 | 8 | 9 | 10 ... # | | | | | # | size |magic|attrs| checksum | payload ... payload_size = - 6 # 6 = sizeof(magic) + sizeof(attrs) + sizeof(checksum) attributes, checksum = data[bytes_processed + 5, 5].unpack(VERSION_1_HEADER) payload = data[bytes_processed + 10, payload_size] case attributes & COMPRESSION_CODEC_MASK when NO_COMPRESSION # a single uncompressed message << Kafka::Message.new(payload, magic, checksum) when GZIP_COMPRESSION # a gzip-compressed message set -- parse recursively uncompressed = Zlib::GzipReader.new(StringIO.new(payload)).read = parse_from(uncompressed) raise 'malformed compressed message' if .size != uncompressed.size .concat(.) when SNAPPY_COMPRESSION # a snappy-compresses message set -- parse recursively ensure_snappy! do uncompressed = Snappy::Reader.new(StringIO.new(payload)).read = parse_from(uncompressed) raise 'malformed compressed message' if .size != uncompressed.size .concat(.) end else # https://cwiki.apache.org/confluence/display/KAFKA/Compression raise "Unsupported Kafka compression codec: #{attributes & COMPRESSION_CODEC_MASK}" end else raise "Unsupported Kafka message version: magic number #{magic}" end bytes_processed += + 4 # 4 = sizeof(message_size) end MessageSet.new(bytes_processed, ) end |
Instance Method Details
#calculate_checksum ⇒ Object
54 55 56 |
# File 'lib/kafka/message.rb', line 54 def calculate_checksum Zlib.crc32(self.payload) end |
#encode(compression = NO_COMPRESSION) ⇒ Object
121 122 123 124 125 126 127 128 129 |
# File 'lib/kafka/message.rb', line 121 def encode(compression = NO_COMPRESSION) @compression = compression self.payload = asciify_payload self.payload = compress_payload if compression? data = magic_and_compression + [calculate_checksum].pack("N") + payload [data.length].pack("N") + data end |
#ensure_snappy!(&block) ⇒ Object
145 146 147 |
# File 'lib/kafka/message.rb', line 145 def ensure_snappy! &block self.class.ensure_snappy! &block end |
#valid? ⇒ Boolean
58 59 60 |
# File 'lib/kafka/message.rb', line 58 def valid? self.checksum == calculate_checksum end |