Class: Kafka::Protocol::Record
- Inherits:
-
Object
- Object
- Kafka::Protocol::Record
- Defined in:
- lib/kafka/protocol/record.rb
Instance Attribute Summary collapse
-
#attributes ⇒ Object
readonly
Returns the value of attribute attributes.
-
#bytesize ⇒ Object
readonly
Returns the value of attribute bytesize.
-
#create_time ⇒ Object
Returns the value of attribute create_time.
-
#headers ⇒ Object
readonly
Returns the value of attribute headers.
-
#is_control_record ⇒ Object
Returns the value of attribute is_control_record.
-
#key ⇒ Object
readonly
Returns the value of attribute key.
-
#offset ⇒ Object
Returns the value of attribute offset.
-
#offset_delta ⇒ Object
Returns the value of attribute offset_delta.
-
#timestamp_delta ⇒ Object
Returns the value of attribute timestamp_delta.
-
#value ⇒ Object
readonly
Returns the value of attribute value.
Class Method Summary collapse
Instance Method Summary collapse
- #==(other) ⇒ Object
- #encode(encoder) ⇒ Object
-
#initialize(key: nil, value:, headers: {}, attributes: 0, offset_delta: 0, offset: 0, timestamp_delta: 0, create_time: Time.now, is_control_record: false) ⇒ Record
constructor
A new instance of Record.
Constructor Details
#initialize(key: nil, value:, headers: {}, attributes: 0, offset_delta: 0, offset: 0, timestamp_delta: 0, create_time: Time.now, is_control_record: false) ⇒ Record
Returns a new instance of Record.
7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 |
# File 'lib/kafka/protocol/record.rb', line 7 def initialize( key: nil, value:, headers: {}, attributes: 0, offset_delta: 0, offset: 0, timestamp_delta: 0, create_time: Time.now, is_control_record: false ) @key = key @value = value @headers = headers @attributes = attributes @offset_delta = offset_delta @offset = offset @timestamp_delta = @create_time = create_time @is_control_record = is_control_record @bytesize = @key.to_s.bytesize + @value.to_s.bytesize end |
Instance Attribute Details
#attributes ⇒ Object (readonly)
Returns the value of attribute attributes.
4 5 6 |
# File 'lib/kafka/protocol/record.rb', line 4 def attributes @attributes end |
#bytesize ⇒ Object (readonly)
Returns the value of attribute bytesize.
4 5 6 |
# File 'lib/kafka/protocol/record.rb', line 4 def bytesize @bytesize end |
#create_time ⇒ Object
Returns the value of attribute create_time.
5 6 7 |
# File 'lib/kafka/protocol/record.rb', line 5 def create_time @create_time end |
#headers ⇒ Object (readonly)
Returns the value of attribute headers.
4 5 6 |
# File 'lib/kafka/protocol/record.rb', line 4 def headers @headers end |
#is_control_record ⇒ Object
Returns the value of attribute is_control_record.
5 6 7 |
# File 'lib/kafka/protocol/record.rb', line 5 def is_control_record @is_control_record end |
#key ⇒ Object (readonly)
Returns the value of attribute key.
4 5 6 |
# File 'lib/kafka/protocol/record.rb', line 4 def key @key end |
#offset ⇒ Object
Returns the value of attribute offset.
5 6 7 |
# File 'lib/kafka/protocol/record.rb', line 5 def offset @offset end |
#offset_delta ⇒ Object
Returns the value of attribute offset_delta.
5 6 7 |
# File 'lib/kafka/protocol/record.rb', line 5 def offset_delta @offset_delta end |
#timestamp_delta ⇒ Object
Returns the value of attribute timestamp_delta.
5 6 7 |
# File 'lib/kafka/protocol/record.rb', line 5 def @timestamp_delta end |
#value ⇒ Object (readonly)
Returns the value of attribute value.
4 5 6 |
# File 'lib/kafka/protocol/record.rb', line 4 def value @value end |
Class Method Details
.decode(decoder) ⇒ Object
59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 |
# File 'lib/kafka/protocol/record.rb', line 59 def self.decode(decoder) record_decoder = Decoder.from_string(decoder.varint_bytes) attributes = record_decoder.int8 = record_decoder.varint offset_delta = record_decoder.varint key = record_decoder.varint_string value = record_decoder.varint_bytes headers = {} record_decoder.varint_array do header_key = record_decoder.varint_string header_value = record_decoder.varint_bytes headers[header_key] = header_value end new( key: key, value: value, headers: headers, attributes: attributes, offset_delta: offset_delta, timestamp_delta: ) end |
Instance Method Details
#==(other) ⇒ Object
52 53 54 55 56 57 |
# File 'lib/kafka/protocol/record.rb', line 52 def ==(other) offset_delta == other.offset_delta && == other. && offset == other.offset && is_control_record == other.is_control_record end |
#encode(encoder) ⇒ Object
32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 |
# File 'lib/kafka/protocol/record.rb', line 32 def encode(encoder) record_buffer = StringIO.new record_encoder = Encoder.new(record_buffer) record_encoder.write_int8(@attributes) record_encoder.write_varint(@timestamp_delta) record_encoder.write_varint(@offset_delta) record_encoder.write_varint_string(@key) record_encoder.write_varint_bytes(@value) record_encoder.write_varint_array(@headers.to_a) do |header_key, header_value| record_encoder.write_varint_string(header_key.to_s) record_encoder.write_varint_bytes(header_value.to_s) end encoder.write_varint_bytes(record_buffer.string) end |