Class: Kafka::Protocol::FetchRequest
- Inherits:
-
Object
- Object
- Kafka::Protocol::FetchRequest
- Defined in:
- lib/kafka/protocol/fetch_request.rb
Overview
A request to fetch messages from a given partition.
API Specification
FetchRequest => ReplicaId MaxWaitTime MinBytes MaxBytes IsolationLevel [TopicName [Partition FetchOffset MaxBytes]]
ReplicaId => int32
MaxWaitTime => int32
MinBytes => int32
MaxBytes => int32
IsolationLevel => int8
TopicName => string
Partition => int32
FetchOffset => int64
MaxBytes => int32
Constant Summary collapse
- ISOLATION_READ_UNCOMMITTED =
0
- ISOLATION_READ_COMMITTED =
1
Instance Method Summary collapse
- #api_key ⇒ Object
- #api_version ⇒ Object
- #encode(encoder) ⇒ Object
-
#initialize(max_wait_time:, min_bytes:, max_bytes:, topics:) ⇒ FetchRequest
constructor
A new instance of FetchRequest.
- #response_class ⇒ Object
Constructor Details
#initialize(max_wait_time:, min_bytes:, max_bytes:, topics:) ⇒ FetchRequest
Returns a new instance of FetchRequest.
28 29 30 31 32 33 34 |
# File 'lib/kafka/protocol/fetch_request.rb', line 28 def initialize(max_wait_time:, min_bytes:, max_bytes:, topics:) @replica_id = REPLICA_ID @max_wait_time = max_wait_time @min_bytes = min_bytes @max_bytes = max_bytes @topics = topics end |
Instance Method Details
#api_key ⇒ Object
36 37 38 |
# File 'lib/kafka/protocol/fetch_request.rb', line 36 def api_key FETCH_API end |
#api_version ⇒ Object
40 41 42 |
# File 'lib/kafka/protocol/fetch_request.rb', line 40 def api_version 4 end |
#encode(encoder) ⇒ Object
48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 |
# File 'lib/kafka/protocol/fetch_request.rb', line 48 def encode(encoder) encoder.write_int32(@replica_id) encoder.write_int32(@max_wait_time) encoder.write_int32(@min_bytes) encoder.write_int32(@max_bytes) encoder.write_int8(ISOLATION_READ_COMMITTED) encoder.write_array(@topics) do |topic, partitions| encoder.write_string(topic) encoder.write_array(partitions) do |partition, config| fetch_offset = config.fetch(:fetch_offset) max_bytes = config.fetch(:max_bytes) encoder.write_int32(partition) encoder.write_int64(fetch_offset) encoder.write_int32(max_bytes) end end end |
#response_class ⇒ Object
44 45 46 |
# File 'lib/kafka/protocol/fetch_request.rb', line 44 def response_class Protocol::FetchResponse end |