Class: Kafka::Protocol::MetadataResponse

Inherits:
Object
  • Object
show all
Defined in:
lib/kafka/protocol/metadata_response.rb

Overview

A response to a MetadataRequest.

The response contains information on the brokers, topics, and partitions in the cluster.

  • For each broker a node id, host, and port is provided.
  • For each topic partition the node id of the broker acting as partition leader, as well as a list of node ids for the set of replicas, are given. The isr list is the subset of replicas that are "in sync", i.e. have fully caught up with the leader.

API Specification

MetadataResponse => [Broker][TopicMetadata]
  Broker => NodeId Host Port  (any number of brokers may be returned)
    NodeId => int32
    Host => string
    Port => int32

  TopicMetadata => TopicErrorCode TopicName [PartitionMetadata]
    TopicErrorCode => int16

  PartitionMetadata => PartitionErrorCode PartitionId Leader Replicas Isr
    PartitionErrorCode => int16
    PartitionId => int32
    Leader => int32
    Replicas => [int32]
    Isr => [int32]

Defined Under Namespace

Classes: PartitionMetadata, TopicMetadata

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(brokers:, controller_id:, topics:) ⇒ MetadataResponse

Returns a new instance of MetadataResponse.



75
76
77
78
79
# File 'lib/kafka/protocol/metadata_response.rb', line 75

def initialize(brokers:, controller_id:, topics:)
  @brokers = brokers
  @controller_id = controller_id
  @topics = topics
end

Instance Attribute Details

#brokersArray<Kafka::BrokerInfo> (readonly)

Returns the list of brokers in the cluster.

Returns:



67
68
69
# File 'lib/kafka/protocol/metadata_response.rb', line 67

def brokers
  @brokers
end

#controller_idInteger (readonly)

Returns The broker id of the controller broker.

Returns:

  • (Integer)

    The broker id of the controller broker.



73
74
75
# File 'lib/kafka/protocol/metadata_response.rb', line 73

def controller_id
  @controller_id
end

#topicsArray<TopicMetadata> (readonly)

Returns the list of topics in the cluster.

Returns:



70
71
72
# File 'lib/kafka/protocol/metadata_response.rb', line 70

def topics
  @topics
end

Class Method Details

.decode(decoder) ⇒ MetadataResponse

Decodes a MetadataResponse from a Decoder containing response data.

Parameters:

Returns:



143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
# File 'lib/kafka/protocol/metadata_response.rb', line 143

def self.decode(decoder)
  brokers = decoder.array do
    node_id = decoder.int32
    host = decoder.string
    port = decoder.int32
    _rack = decoder.string

    BrokerInfo.new(
      node_id: node_id,
      host: host,
      port: port
    )
  end

  controller_id = decoder.int32

  topics = decoder.array do
    topic_error_code = decoder.int16
    topic_name = decoder.string
    _is_internal = decoder.boolean

    partitions = decoder.array do
      PartitionMetadata.new(
        partition_error_code: decoder.int16,
        partition_id: decoder.int32,
        leader: decoder.int32,
        replicas: decoder.array { decoder.int32 },
        isr: decoder.array { decoder.int32 },
      )
    end

    TopicMetadata.new(
      topic_error_code: topic_error_code,
      topic_name: topic_name,
      partitions: partitions,
    )
  end

  new(brokers: brokers, controller_id: controller_id, topics: topics)
end

Instance Method Details

#controller_brokerObject



123
124
125
# File 'lib/kafka/protocol/metadata_response.rb', line 123

def controller_broker
  find_broker(controller_id)
end

#find_broker(node_id) ⇒ Kafka::BrokerInfo

Finds the broker info for the given node id.

Parameters:

  • node_id (Integer)

    the node id of the broker.

Returns:

Raises:



115
116
117
118
119
120
121
# File 'lib/kafka/protocol/metadata_response.rb', line 115

def find_broker(node_id)
  broker = @brokers.find {|b| b.node_id == node_id }

  raise Kafka::NoSuchBroker, "No broker with id #{node_id}" if broker.nil?

  broker
end

#find_leader_id(topic, partition) ⇒ Integer

Finds the node id of the broker that is acting as leader for the given topic and partition per this metadata.

Parameters:

  • topic (String)

    the name of the topic.

  • partition (Integer)

    the partition number.

Returns:

  • (Integer)

    the node id of the leader.



87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
# File 'lib/kafka/protocol/metadata_response.rb', line 87

def find_leader_id(topic, partition)
  topic_info = @topics.find {|t| t.topic_name == topic }

  if topic_info.nil?
    raise UnknownTopicOrPartition, "no topic #{topic}"
  end

  Protocol.handle_error(topic_info.topic_error_code)

  partition_info = topic_info.partitions.find {|p| p.partition_id == partition }

  if partition_info.nil?
    raise UnknownTopicOrPartition, "no partition #{partition} in topic #{topic}"
  end

  begin
    Protocol.handle_error(partition_info.partition_error_code)
  rescue ReplicaNotAvailable
    # This error can be safely ignored per the protocol specification.
  end

  partition_info.leader
end

#partitions_for(topic_name) ⇒ Object



127
128
129
130
131
132
133
134
135
136
137
# File 'lib/kafka/protocol/metadata_response.rb', line 127

def partitions_for(topic_name)
  topic = @topics.find {|t| t.topic_name == topic_name }

  if topic.nil?
    raise UnknownTopicOrPartition, "unknown topic #{topic_name}"
  end

  Protocol.handle_error(topic.topic_error_code)

  topic.partitions
end