66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
|
# File 'lib/kafka/protocol/message.rb', line 66
def self.decode(decoder)
offset = decoder.int64
message_decoder = Decoder.from_string(decoder.bytes)
_crc = message_decoder.int32
magic_byte = message_decoder.int8
attributes = message_decoder.int8
case magic_byte
when 0
timestamp = nil
when 1
timestamp = message_decoder.int64
timestamp = nil if timestamp.zero?
else
raise Kafka::Error, "Invalid magic byte: #{magic_byte}"
end
key = message_decoder.bytes
value = message_decoder.bytes
codec_id = attributes & 0b111
create_time = timestamp && Time.at(timestamp / 1000.0)
new(key: key, value: value, codec_id: codec_id, offset: offset, create_time: create_time)
end
|