Class: Kafka::Protocol::ProduceRequest

Inherits:
Object
  • Object
show all
Defined in:
lib/kafka/protocol/produce_request.rb

Overview

A produce request sends a message set to the server.

## API Specification

ProduceRequest => RequiredAcks Timeout [TopicName [Partition MessageSetSize MessageSet]]
    RequiredAcks => int16
    Timeout => int32
    Partition => int32
    MessageSetSize => int32

MessageSet => [Offset MessageSize Message]
    Offset => int64
    MessageSize => int32

Message => Crc MagicByte Attributes Key Value
    Crc => int32
    MagicByte => int8
    Attributes => int8
    Key => bytes
    Value => bytes

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(transactional_id: nil, required_acks:, timeout:, messages_for_topics:, compressor: nil) ⇒ ProduceRequest

Returns a new instance of ProduceRequest.

Parameters:

  • required_acks (Integer)
  • timeout (Integer)
  • messages_for_topics (Hash)


35
36
37
38
39
40
41
# File 'lib/kafka/protocol/produce_request.rb', line 35

def initialize(transactional_id: nil, required_acks:, timeout:, messages_for_topics:, compressor: nil)
  @transactional_id = transactional_id
  @required_acks = required_acks
  @timeout = timeout
  @messages_for_topics = messages_for_topics
  @compressor = compressor
end

Instance Attribute Details

#compressorObject (readonly)

Returns the value of attribute compressor.



30
31
32
# File 'lib/kafka/protocol/produce_request.rb', line 30

def compressor
  @compressor
end

#messages_for_topicsObject (readonly)

Returns the value of attribute messages_for_topics.



30
31
32
# File 'lib/kafka/protocol/produce_request.rb', line 30

def messages_for_topics
  @messages_for_topics
end

#required_acksObject (readonly)

Returns the value of attribute required_acks.



30
31
32
# File 'lib/kafka/protocol/produce_request.rb', line 30

def required_acks
  @required_acks
end

#timeoutObject (readonly)

Returns the value of attribute timeout.



30
31
32
# File 'lib/kafka/protocol/produce_request.rb', line 30

def timeout
  @timeout
end

#transactional_idObject (readonly)

Returns the value of attribute transactional_id.



30
31
32
# File 'lib/kafka/protocol/produce_request.rb', line 30

def transactional_id
  @transactional_id
end

Instance Method Details

#api_keyObject



43
44
45
# File 'lib/kafka/protocol/produce_request.rb', line 43

def api_key
  PRODUCE_API
end

#api_versionObject



47
48
49
# File 'lib/kafka/protocol/produce_request.rb', line 47

def api_version
  3
end

#encode(encoder) ⇒ Object



63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
# File 'lib/kafka/protocol/produce_request.rb', line 63

def encode(encoder)
  encoder.write_string(@transactional_id)
  encoder.write_int16(@required_acks)
  encoder.write_int32(@timeout)

  encoder.write_array(@messages_for_topics) do |topic, messages_for_partition|
    encoder.write_string(topic)

    encoder.write_array(messages_for_partition) do |partition, record_batch|
      encoder.write_int32(partition)

      record_batch.fulfill_relative_data
      encoded_record_batch = compress(record_batch)
      encoder.write_bytes(encoded_record_batch)
    end
  end
end

#requires_acks?Boolean

Whether this request requires any acknowledgements at all. If no acknowledgements are required, the server will not send back a response at all.

Returns:

  • (Boolean)

    true if acknowledgements are required, false otherwise.



59
60
61
# File 'lib/kafka/protocol/produce_request.rb', line 59

def requires_acks?
  @required_acks != 0
end

#response_classObject



51
52
53
# File 'lib/kafka/protocol/produce_request.rb', line 51

def response_class
  requires_acks? ? Protocol::ProduceResponse : nil
end