Class: Kafka::Protocol::FetchRequest

Inherits:
Object
  • Object
show all
Defined in:
lib/kafka/protocol/fetch_request.rb

Overview

A request to fetch messages from a given partition.

API Specification

FetchRequest => ReplicaId MaxWaitTime MinBytes MaxBytes IsolationLevel [TopicName [Partition FetchOffset MaxBytes]]
  ReplicaId => int32
  MaxWaitTime => int32
  MinBytes => int32
  MaxBytes => int32
  IsolationLevel => int8
  TopicName => string
  Partition => int32
  FetchOffset => int64
  MaxBytes => int32

Constant Summary collapse

ISOLATION_READ_UNCOMMITTED =
0
ISOLATION_READ_COMMITTED =
1

Instance Method Summary collapse

Constructor Details

#initialize(max_wait_time:, min_bytes:, max_bytes:, topics:) ⇒ FetchRequest

Returns a new instance of FetchRequest.

Parameters:

  • max_wait_time (Integer)
  • min_bytes (Integer)
  • topics (Hash)


28
29
30
31
32
33
34
# File 'lib/kafka/protocol/fetch_request.rb', line 28

def initialize(max_wait_time:, min_bytes:, max_bytes:, topics:)
  @replica_id = REPLICA_ID
  @max_wait_time = max_wait_time
  @min_bytes = min_bytes
  @max_bytes = max_bytes
  @topics = topics
end

Instance Method Details

#api_keyObject



36
37
38
# File 'lib/kafka/protocol/fetch_request.rb', line 36

def api_key
  FETCH_API
end

#api_versionObject



40
41
42
# File 'lib/kafka/protocol/fetch_request.rb', line 40

def api_version
  4
end

#encode(encoder) ⇒ Object



48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
# File 'lib/kafka/protocol/fetch_request.rb', line 48

def encode(encoder)
  encoder.write_int32(@replica_id)
  encoder.write_int32(@max_wait_time)
  encoder.write_int32(@min_bytes)
  encoder.write_int32(@max_bytes)
  encoder.write_int8(ISOLATION_READ_COMMITTED)

  encoder.write_array(@topics) do |topic, partitions|
    encoder.write_string(topic)

    encoder.write_array(partitions) do |partition, config|
      fetch_offset = config.fetch(:fetch_offset)
      max_bytes = config.fetch(:max_bytes)

      encoder.write_int32(partition)
      encoder.write_int64(fetch_offset)
      encoder.write_int32(max_bytes)
    end
  end
end

#response_classObject



44
45
46
# File 'lib/kafka/protocol/fetch_request.rb', line 44

def response_class
  Protocol::FetchResponse
end