Class: Kafka::ConsumerGroup::Assignor
- Inherits:
-
Object
- Object
- Kafka::ConsumerGroup::Assignor
- Defined in:
- lib/kafka/consumer_group/assignor.rb
Overview
A consumer group partition assignor
Defined Under Namespace
Classes: Partition
Instance Method Summary collapse
-
#assign(members:, topics:) ⇒ Hash<String, Kafka::Protocol::MemberAssignment>
Assign the topic partitions to the group members.
-
#initialize(cluster:, strategy:) ⇒ Assignor
constructor
A new instance of Assignor.
- #protocol_name ⇒ Object
- #user_data ⇒ Object
Constructor Details
#initialize(cluster:, strategy:) ⇒ Assignor
Returns a new instance of Assignor.
15 16 17 18 |
# File 'lib/kafka/consumer_group/assignor.rb', line 15 def initialize(cluster:, strategy:) @cluster = cluster @strategy = strategy end |
Instance Method Details
#assign(members:, topics:) ⇒ Hash<String, Kafka::Protocol::MemberAssignment>
Assign the topic partitions to the group members.
35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 |
# File 'lib/kafka/consumer_group/assignor.rb', line 35 def assign(members:, topics:) topic_partitions = topics.flat_map do |topic| begin partition_ids = @cluster.partitions_for(topic).map(&:partition_id) rescue UnknownTopicOrPartition raise UnknownTopicOrPartition, "unknown topic #{topic}" end partition_ids.map {|partition_id| Partition.new(topic, partition_id) } end group_assignment = {} members.each_key do |member_id| group_assignment[member_id] = Protocol::MemberAssignment.new end @strategy.call(cluster: @cluster, members: members, partitions: topic_partitions).each do |member_id, partitions| Array(partitions).each do |partition| group_assignment[member_id].assign(partition.topic, [partition.partition_id]) end end group_assignment rescue Kafka::LeaderNotAvailable sleep 1 retry end |
#protocol_name ⇒ Object
20 21 22 |
# File 'lib/kafka/consumer_group/assignor.rb', line 20 def protocol_name @strategy.respond_to?(:protocol_name) ? @strategy.protocol_name : @strategy.class.to_s end |
#user_data ⇒ Object
24 25 26 |
# File 'lib/kafka/consumer_group/assignor.rb', line 24 def user_data @strategy.user_data if @strategy.respond_to?(:user_data) end |