Class: Kafka::RoundRobinAssignmentStrategy
- Inherits:
-
Object
- Object
- Kafka::RoundRobinAssignmentStrategy
- Defined in:
- lib/kafka/round_robin_assignment_strategy.rb
Overview
A consumer group partition assignment strategy that assigns partitions to consumers in a round-robin fashion.
Instance Method Summary collapse
-
#assign(members:, topics:) ⇒ Hash<String, Protocol::MemberAssignment>
Assign the topic partitions to the group members.
-
#initialize(cluster:) ⇒ RoundRobinAssignmentStrategy
constructor
A new instance of RoundRobinAssignmentStrategy.
Constructor Details
#initialize(cluster:) ⇒ RoundRobinAssignmentStrategy
Returns a new instance of RoundRobinAssignmentStrategy.
10 11 12 |
# File 'lib/kafka/round_robin_assignment_strategy.rb', line 10 def initialize(cluster:) @cluster = cluster end |
Instance Method Details
#assign(members:, topics:) ⇒ Hash<String, Protocol::MemberAssignment>
Assign the topic partitions to the group members.
20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 |
# File 'lib/kafka/round_robin_assignment_strategy.rb', line 20 def assign(members:, topics:) group_assignment = {} members.each do |member_id| group_assignment[member_id] = Protocol::MemberAssignment.new end topic_partitions = topics.flat_map do |topic| begin partitions = @cluster.partitions_for(topic).map(&:partition_id) rescue UnknownTopicOrPartition raise UnknownTopicOrPartition, "unknown topic #{topic}" end Array.new(partitions.count) { topic }.zip(partitions) end partitions_per_member = topic_partitions.group_by.with_index do |_, index| index % members.count end.values members.zip(partitions_per_member).each do |member_id, member_partitions| unless member_partitions.nil? member_partitions.each do |topic, partition| group_assignment[member_id].assign(topic, [partition]) end end end group_assignment rescue Kafka::LeaderNotAvailable sleep 1 retry end |