Class: Kafka::ConsumerGroup::Assignor

Inherits:
Object
  • Object
show all
Defined in:
lib/kafka/consumer_group/assignor.rb

Overview

A consumer group partition assignor

Defined Under Namespace

Classes: Partition

Instance Method Summary collapse

Constructor Details

#initialize(cluster:, strategy:) ⇒ Assignor

Returns a new instance of Assignor.

Parameters:

  • cluster (Kafka::Cluster)
  • strategy (Object)

    an object that implements #protocol_type,

    user_data, and #assign.



15
16
17
18
# File 'lib/kafka/consumer_group/assignor.rb', line 15

def initialize(cluster:, strategy:)
  @cluster = cluster
  @strategy = strategy
end

Instance Method Details

#assign(members:, topics:) ⇒ Hash<String, Kafka::Protocol::MemberAssignment>

Assign the topic partitions to the group members.

Parameters:

Returns:



35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
# File 'lib/kafka/consumer_group/assignor.rb', line 35

def assign(members:, topics:)
  topic_partitions = topics.flat_map do |topic|
    begin
      partition_ids = @cluster.partitions_for(topic).map(&:partition_id)
    rescue UnknownTopicOrPartition
      raise UnknownTopicOrPartition, "unknown topic #{topic}"
    end
    partition_ids.map {|partition_id| Partition.new(topic, partition_id) }
  end

  group_assignment = {}

  members.each_key do |member_id|
    group_assignment[member_id] = Protocol::MemberAssignment.new
  end
  @strategy.call(cluster: @cluster, members: members, partitions: topic_partitions).each do |member_id, partitions|
    Array(partitions).each do |partition|
      group_assignment[member_id].assign(partition.topic, [partition.partition_id])
    end
  end

  group_assignment
rescue Kafka::LeaderNotAvailable
  sleep 1
  retry
end

#protocol_nameObject



20
21
22
# File 'lib/kafka/consumer_group/assignor.rb', line 20

def protocol_name
  @strategy.respond_to?(:protocol_name) ? @strategy.protocol_name : @strategy.class.to_s
end

#user_dataObject



24
25
26
# File 'lib/kafka/consumer_group/assignor.rb', line 24

def user_data
  @strategy.user_data if @strategy.respond_to?(:user_data)
end