Class: Kafka::Protocol::Message

Inherits:
Object
  • Object
show all
Defined in:
lib/kafka/protocol/message.rb

Overview

## API Specification

Message => Crc MagicByte Attributes Timestamp Key Value
    Crc => int32
    MagicByte => int8
    Attributes => int8
    Timestamp => int64, in ms
    Key => bytes
    Value => bytes

Constant Summary collapse

MAGIC_BYTE =
1

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(value:, key: nil, create_time: Time.now, codec_id: 0, offset: -1)) ⇒ Message

Returns a new instance of Message.



26
27
28
29
30
31
32
33
34
# File 'lib/kafka/protocol/message.rb', line 26

def initialize(value:, key: nil, create_time: Time.now, codec_id: 0, offset: -1)
  @key = key
  @value = value
  @codec_id = codec_id
  @offset = offset
  @create_time = create_time

  @bytesize = @key.to_s.bytesize + @value.to_s.bytesize
end

Instance Attribute Details

#bytesizeObject (readonly)

Returns the value of attribute bytesize.



24
25
26
# File 'lib/kafka/protocol/message.rb', line 24

def bytesize
  @bytesize
end

#codec_idObject (readonly)

Returns the value of attribute codec_id.



22
23
24
# File 'lib/kafka/protocol/message.rb', line 22

def codec_id
  @codec_id
end

#create_timeObject (readonly)

Returns the value of attribute create_time.



24
25
26
# File 'lib/kafka/protocol/message.rb', line 24

def create_time
  @create_time
end

#keyObject (readonly)

Returns the value of attribute key.



22
23
24
# File 'lib/kafka/protocol/message.rb', line 22

def key
  @key
end

#offsetObject (readonly)

Returns the value of attribute offset.



22
23
24
# File 'lib/kafka/protocol/message.rb', line 22

def offset
  @offset
end

#valueObject (readonly)

Returns the value of attribute value.



22
23
24
# File 'lib/kafka/protocol/message.rb', line 22

def value
  @value
end

Class Method Details

.decode(decoder) ⇒ Object



66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
# File 'lib/kafka/protocol/message.rb', line 66

def self.decode(decoder)
  offset = decoder.int64
  message_decoder = Decoder.from_string(decoder.bytes)

  _crc = message_decoder.int32
  magic_byte = message_decoder.int8
  attributes = message_decoder.int8

  # The magic byte indicates the message format version. There are situations
  # where an old message format can be returned from a newer version of Kafka,
  # because old messages are not necessarily rewritten on upgrades.
  case magic_byte
  when 0
    # No timestamp in the pre-0.10 message format.
    timestamp = nil
  when 1
    timestamp = message_decoder.int64

    # If the timestamp is set to zero, it's because the message has been upgraded
    # from the Kafka 0.9 disk format to the Kafka 0.10 format. The former didn't
    # have a timestamp attribute, so we'll just set the timestamp to nil.
    timestamp = nil if timestamp.zero?
  else
    raise Kafka::Error, "Invalid magic byte: #{magic_byte}"
  end

  key = message_decoder.bytes
  value = message_decoder.bytes

  # The codec id is encoded in the three least significant bits of the
  # attributes.
  codec_id = attributes & 0b111

  # The timestamp will be nil if the message was written in the Kafka 0.9 log format.
  create_time = timestamp && Time.at(timestamp / 1000.0)

  new(key: key, value: value, codec_id: codec_id, offset: offset, create_time: create_time)
end

Instance Method Details

#==(other) ⇒ Object



43
44
45
46
47
48
# File 'lib/kafka/protocol/message.rb', line 43

def ==(other)
  @key == other.key &&
    @value == other.value &&
    @codec_id == other.codec_id &&
    @offset == other.offset
end

#compressed?Boolean

Returns:

  • (Boolean)


50
51
52
# File 'lib/kafka/protocol/message.rb', line 50

def compressed?
  @codec_id != 0
end

#decompressArray<Kafka::Protocol::Message>

Returns:



55
56
57
58
59
60
61
62
63
64
# File 'lib/kafka/protocol/message.rb', line 55

def decompress
  codec = Compression.find_codec_by_id(@codec_id)

  # For some weird reason we need to cut out the first 20 bytes.
  data = codec.decompress(value)
  message_set_decoder = Decoder.from_string(data)
  message_set = MessageSet.decode(message_set_decoder)

  correct_offsets(message_set.messages)
end

#encode(encoder) ⇒ Object



36
37
38
39
40
41
# File 'lib/kafka/protocol/message.rb', line 36

def encode(encoder)
  data = encode_with_crc

  encoder.write_int64(offset)
  encoder.write_bytes(data)
end

#headersObject



110
111
112
# File 'lib/kafka/protocol/message.rb', line 110

def headers
  {}
end

#is_control_recordObject

Ensure the backward compatibility of Message format from Kafka 0.11.x



106
107
108
# File 'lib/kafka/protocol/message.rb', line 106

def is_control_record
  false
end