Method: Kafka::Protocol::Message.decode

Defined in:
lib/kafka/protocol/message.rb

.decode(decoder) ⇒ Object



66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
# File 'lib/kafka/protocol/message.rb', line 66

def self.decode(decoder)
  offset = decoder.int64
  message_decoder = Decoder.from_string(decoder.bytes)

  _crc = message_decoder.int32
  magic_byte = message_decoder.int8
  attributes = message_decoder.int8

  # The magic byte indicates the message format version. There are situations
  # where an old message format can be returned from a newer version of Kafka,
  # because old messages are not necessarily rewritten on upgrades.
  case magic_byte
  when 0
    # No timestamp in the pre-0.10 message format.
    timestamp = nil
  when 1
    timestamp = message_decoder.int64

    # If the timestamp is set to zero, it's because the message has been upgraded
    # from the Kafka 0.9 disk format to the Kafka 0.10 format. The former didn't
    # have a timestamp attribute, so we'll just set the timestamp to nil.
    timestamp = nil if timestamp.zero?
  else
    raise Kafka::Error, "Invalid magic byte: #{magic_byte}"
  end

  key = message_decoder.bytes
  value = message_decoder.bytes

  # The codec id is encoded in the three least significant bits of the
  # attributes.
  codec_id = attributes & 0b111

  # The timestamp will be nil if the message was written in the Kafka 0.9 log format.
  create_time = timestamp && Time.at(timestamp / 1000.0)

  new(key: key, value: value, codec_id: codec_id, offset: offset, create_time: create_time)
end