Class: Kafka::Protocol::ProduceRequest
- Inherits:
-
Object
- Object
- Kafka::Protocol::ProduceRequest
- Defined in:
- lib/kafka/protocol/produce_request.rb
Overview
A produce request sends a message set to the server.
API Specification
ProduceRequest => RequiredAcks Timeout [TopicName [Partition MessageSetSize MessageSet]]
RequiredAcks => int16
Timeout => int32
Partition => int32
MessageSetSize => int32
MessageSet => [Offset MessageSize Message]
Offset => int64
MessageSize => int32
Message => Crc MagicByte Attributes Key Value
Crc => int32
MagicByte => int8
Attributes => int8
Key => bytes
Value => bytes
Constant Summary collapse
- API_MIN_VERSION =
3
Instance Attribute Summary collapse
-
#compressor ⇒ Object
readonly
Returns the value of attribute compressor.
-
#messages_for_topics ⇒ Object
readonly
Returns the value of attribute messages_for_topics.
-
#required_acks ⇒ Object
readonly
Returns the value of attribute required_acks.
-
#timeout ⇒ Object
readonly
Returns the value of attribute timeout.
-
#transactional_id ⇒ Object
readonly
Returns the value of attribute transactional_id.
Instance Method Summary collapse
- #api_key ⇒ Object
- #api_version ⇒ Object
- #encode(encoder) ⇒ Object
-
#initialize(transactional_id: nil, required_acks:, timeout:, messages_for_topics:, compressor: nil) ⇒ ProduceRequest
constructor
A new instance of ProduceRequest.
-
#requires_acks? ⇒ Boolean
Whether this request requires any acknowledgements at all.
- #response_class ⇒ Object
Constructor Details
#initialize(transactional_id: nil, required_acks:, timeout:, messages_for_topics:, compressor: nil) ⇒ ProduceRequest
Returns a new instance of ProduceRequest.
37 38 39 40 41 42 43 |
# File 'lib/kafka/protocol/produce_request.rb', line 37 def initialize(transactional_id: nil, required_acks:, timeout:, messages_for_topics:, compressor: nil) @transactional_id = transactional_id @required_acks = required_acks @timeout = timeout @messages_for_topics = @compressor = compressor end |
Instance Attribute Details
#compressor ⇒ Object (readonly)
Returns the value of attribute compressor.
32 33 34 |
# File 'lib/kafka/protocol/produce_request.rb', line 32 def compressor @compressor end |
#messages_for_topics ⇒ Object (readonly)
Returns the value of attribute messages_for_topics.
32 33 34 |
# File 'lib/kafka/protocol/produce_request.rb', line 32 def @messages_for_topics end |
#required_acks ⇒ Object (readonly)
Returns the value of attribute required_acks.
32 33 34 |
# File 'lib/kafka/protocol/produce_request.rb', line 32 def required_acks @required_acks end |
#timeout ⇒ Object (readonly)
Returns the value of attribute timeout.
32 33 34 |
# File 'lib/kafka/protocol/produce_request.rb', line 32 def timeout @timeout end |
#transactional_id ⇒ Object (readonly)
Returns the value of attribute transactional_id.
32 33 34 |
# File 'lib/kafka/protocol/produce_request.rb', line 32 def transactional_id @transactional_id end |
Instance Method Details
#api_key ⇒ Object
45 46 47 |
# File 'lib/kafka/protocol/produce_request.rb', line 45 def api_key PRODUCE_API end |
#api_version ⇒ Object
49 50 51 |
# File 'lib/kafka/protocol/produce_request.rb', line 49 def api_version compressor.codec.nil? ? API_MIN_VERSION : [compressor.codec.produce_api_min_version, API_MIN_VERSION].max end |
#encode(encoder) ⇒ Object
65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 |
# File 'lib/kafka/protocol/produce_request.rb', line 65 def encode(encoder) encoder.write_string(@transactional_id) encoder.write_int16(@required_acks) encoder.write_int32(@timeout) encoder.write_array(@messages_for_topics) do |topic, | encoder.write_string(topic) encoder.write_array() do |partition, record_batch| encoder.write_int32(partition) record_batch.fulfill_relative_data encoded_record_batch = compress(record_batch) encoder.write_bytes(encoded_record_batch) end end end |
#requires_acks? ⇒ Boolean
Whether this request requires any acknowledgements at all. If no acknowledgements are required, the server will not send back a response at all.
61 62 63 |
# File 'lib/kafka/protocol/produce_request.rb', line 61 def requires_acks? @required_acks != 0 end |
#response_class ⇒ Object
53 54 55 |
# File 'lib/kafka/protocol/produce_request.rb', line 53 def response_class requires_acks? ? Protocol::ProduceResponse : nil end |