Class: Kafka::Protocol::MetadataResponse
- Inherits:
-
Object
- Object
- Kafka::Protocol::MetadataResponse
- Defined in:
- lib/kafka/protocol/metadata_response.rb
Overview
A response to a MetadataRequest.
The response contains information on the brokers, topics, and partitions in the cluster.
- For each broker a node id, host, and port is provided.
- For each topic partition the node id of the broker acting as partition leader,
as well as a list of node ids for the set of replicas, are given. The
isr
list is the subset of replicas that are "in sync", i.e. have fully caught up with the leader.
API Specification
MetadataResponse => [Broker][TopicMetadata]
Broker => NodeId Host Port (any number of brokers may be returned)
NodeId => int32
Host => string
Port => int32
TopicMetadata => TopicErrorCode TopicName [PartitionMetadata]
TopicErrorCode => int16
PartitionMetadata => PartitionErrorCode PartitionId Leader Replicas Isr
PartitionErrorCode => int16
PartitionId => int32
Leader => int32
Replicas => [int32]
Isr => [int32]
Defined Under Namespace
Classes: PartitionMetadata, TopicMetadata
Instance Attribute Summary collapse
-
#brokers ⇒ Array<Kafka::BrokerInfo>
readonly
The list of brokers in the cluster.
-
#controller_id ⇒ Integer
readonly
The broker id of the controller broker.
-
#topics ⇒ Array<TopicMetadata>
readonly
The list of topics in the cluster.
Class Method Summary collapse
-
.decode(decoder) ⇒ MetadataResponse
Decodes a MetadataResponse from a Decoder containing response data.
Instance Method Summary collapse
- #controller_broker ⇒ Object
-
#find_broker(node_id) ⇒ Kafka::BrokerInfo
Finds the broker info for the given node id.
-
#find_leader_id(topic, partition) ⇒ Integer
Finds the node id of the broker that is acting as leader for the given topic and partition per this metadata.
-
#initialize(brokers:, controller_id:, topics:) ⇒ MetadataResponse
constructor
A new instance of MetadataResponse.
- #partitions_for(topic_name) ⇒ Object
Constructor Details
#initialize(brokers:, controller_id:, topics:) ⇒ MetadataResponse
Returns a new instance of MetadataResponse.
75 76 77 78 79 |
# File 'lib/kafka/protocol/metadata_response.rb', line 75 def initialize(brokers:, controller_id:, topics:) @brokers = brokers @controller_id = controller_id @topics = topics end |
Instance Attribute Details
#brokers ⇒ Array<Kafka::BrokerInfo> (readonly)
Returns the list of brokers in the cluster.
67 68 69 |
# File 'lib/kafka/protocol/metadata_response.rb', line 67 def brokers @brokers end |
#controller_id ⇒ Integer (readonly)
Returns The broker id of the controller broker.
73 74 75 |
# File 'lib/kafka/protocol/metadata_response.rb', line 73 def controller_id @controller_id end |
#topics ⇒ Array<TopicMetadata> (readonly)
Returns the list of topics in the cluster.
70 71 72 |
# File 'lib/kafka/protocol/metadata_response.rb', line 70 def topics @topics end |
Class Method Details
.decode(decoder) ⇒ MetadataResponse
Decodes a MetadataResponse from a Decoder containing response data.
143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 |
# File 'lib/kafka/protocol/metadata_response.rb', line 143 def self.decode(decoder) brokers = decoder.array do node_id = decoder.int32 host = decoder.string port = decoder.int32 _rack = decoder.string BrokerInfo.new( node_id: node_id, host: host, port: port ) end controller_id = decoder.int32 topics = decoder.array do topic_error_code = decoder.int16 topic_name = decoder.string _is_internal = decoder.boolean partitions = decoder.array do PartitionMetadata.new( partition_error_code: decoder.int16, partition_id: decoder.int32, leader: decoder.int32, replicas: decoder.array { decoder.int32 }, isr: decoder.array { decoder.int32 }, ) end TopicMetadata.new( topic_error_code: topic_error_code, topic_name: topic_name, partitions: partitions, ) end new(brokers: brokers, controller_id: controller_id, topics: topics) end |
Instance Method Details
#controller_broker ⇒ Object
123 124 125 |
# File 'lib/kafka/protocol/metadata_response.rb', line 123 def controller_broker find_broker(controller_id) end |
#find_broker(node_id) ⇒ Kafka::BrokerInfo
Finds the broker info for the given node id.
115 116 117 118 119 120 121 |
# File 'lib/kafka/protocol/metadata_response.rb', line 115 def find_broker(node_id) broker = @brokers.find {|b| b.node_id == node_id } raise Kafka::NoSuchBroker, "No broker with id #{node_id}" if broker.nil? broker end |
#find_leader_id(topic, partition) ⇒ Integer
Finds the node id of the broker that is acting as leader for the given topic and partition per this metadata.
87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 |
# File 'lib/kafka/protocol/metadata_response.rb', line 87 def find_leader_id(topic, partition) topic_info = @topics.find {|t| t.topic_name == topic } if topic_info.nil? raise UnknownTopicOrPartition, "no topic #{topic}" end Protocol.handle_error(topic_info.topic_error_code) partition_info = topic_info.partitions.find {|p| p.partition_id == partition } if partition_info.nil? raise UnknownTopicOrPartition, "no partition #{partition} in topic #{topic}" end begin Protocol.handle_error(partition_info.partition_error_code) rescue ReplicaNotAvailable # This error can be safely ignored per the protocol specification. end partition_info.leader end |
#partitions_for(topic_name) ⇒ Object
127 128 129 130 131 132 133 134 135 136 137 |
# File 'lib/kafka/protocol/metadata_response.rb', line 127 def partitions_for(topic_name) topic = @topics.find {|t| t.topic_name == topic_name } if topic.nil? raise UnknownTopicOrPartition, "unknown topic #{topic_name}" end Protocol.handle_error(topic.topic_error_code) topic.partitions end |