Class: Kafka::EC2::MixedInstanceAssignmentStrategy
- Inherits:
-
Object
- Object
- Kafka::EC2::MixedInstanceAssignmentStrategy
- Defined in:
- lib/kafka/ec2/mixed_instance_assignment_strategy.rb
Constant Summary collapse
- DELIMITER =
","
Instance Method Summary collapse
-
#call(cluster:, members:, partitions:) ⇒ Hash{String => Protocol::MemberAssignment}
Assign the topic partitions to the group members.
-
#initialize(instance_family_weights: {}, availability_zone_weights: {}, weights: {}, partition_weights: {}) ⇒ MixedInstanceAssignmentStrategy
constructor
A new instance of MixedInstanceAssignmentStrategy.
- #protocol_name ⇒ Object
- #user_data ⇒ Object
Constructor Details
#initialize(instance_family_weights: {}, availability_zone_weights: {}, weights: {}, partition_weights: {}) ⇒ MixedInstanceAssignmentStrategy
Returns a new instance of MixedInstanceAssignmentStrategy.
24 25 26 27 28 29 |
# File 'lib/kafka/ec2/mixed_instance_assignment_strategy.rb', line 24 def initialize(instance_family_weights: {}, availability_zone_weights: {}, weights: {}, partition_weights: {}) @instance_family_weights = instance_family_weights @availability_zone_weights = availability_zone_weights @weights = weights @partition_weights = partition_weights end |
Instance Method Details
#call(cluster:, members:, partitions:) ⇒ Hash{String => Protocol::MemberAssignment}
Assign the topic partitions to the group members.
51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 |
# File 'lib/kafka/ec2/mixed_instance_assignment_strategy.rb', line 51 def call(cluster:, members:, partitions:) member_id_to_partitions = Hash.new { |h, k| h[k] = [] } instance_id_to_capacity = Hash.new(0) instance_id_to_member_ids = Hash.new { |h, k| h[k] = [] } total_capacity = 0 member_id_to_instance_id = {} instance_family_to_capacity = @instance_family_weights.is_a?(Proc) ? @instance_family_weights.call() : @instance_family_weights az_to_capacity = @availability_zone_weights.is_a?(Proc) ? @availability_zone_weights.call() : @availability_zone_weights weights = @weights.is_a?(Proc) ? @weights.call() : @weights members.each do |member_id, | instance_id, instance_type, az = .user_data.split(DELIMITER) instance_id_to_member_ids[instance_id] << member_id member_id_to_instance_id[member_id] = instance_id capacity = calculate_capacity(instance_type, az, instance_family_to_capacity, az_to_capacity, weights) instance_id_to_capacity[instance_id] += capacity total_capacity += capacity end partition_weights = build_partition_weights(partitions) partition_weight_per_capacity = partitions.sum { |partition| partition_weights.dig(partition.topic, partition.partition_id) } / total_capacity last_index = 0 member_id_to_acceptable_partition_weight = {} instance_id_to_total_acceptable_partition_weight = Hash.new(0) instance_id_to_capacity.each do |instance_id, capacity| member_ids = instance_id_to_member_ids[instance_id] member_ids.each do |member_id| acceptable_partition_weight = capacity * partition_weight_per_capacity / member_ids.size while last_index < partitions.size partition = partitions[last_index] partition_weight = partition_weights.dig(partition.topic, partition.partition_id) break if acceptable_partition_weight - partition_weight < 0 member_id_to_partitions[member_id] << partition acceptable_partition_weight -= partition_weight last_index += 1 end member_id_to_acceptable_partition_weight[member_id] = acceptable_partition_weight instance_id_to_total_acceptable_partition_weight[instance_id] += acceptable_partition_weight end end while last_index < partitions.size max_acceptable_partition_weight = member_id_to_acceptable_partition_weight.values.max member_ids = member_id_to_acceptable_partition_weight.select { |_, w| w == max_acceptable_partition_weight }.keys if member_ids.size == 1 member_id = member_ids.first else member_id = member_ids.max_by { |id| instance_id_to_total_acceptable_partition_weight[member_id_to_instance_id[id]] } end partition = partitions[last_index] member_id_to_partitions[member_id] << partition partition_weight = partition_weights.dig(partition.topic, partition.partition_id) member_id_to_acceptable_partition_weight[member_id] -= partition_weight instance_id_to_total_acceptable_partition_weight[member_id_to_instance_id[member_id]] -= partition_weight last_index += 1 end member_id_to_partitions rescue Kafka::LeaderNotAvailable sleep 1 retry end |
#protocol_name ⇒ Object
31 32 33 |
# File 'lib/kafka/ec2/mixed_instance_assignment_strategy.rb', line 31 def protocol_name "mixedinstance" end |
#user_data ⇒ Object
35 36 37 38 39 40 41 42 43 |
# File 'lib/kafka/ec2/mixed_instance_assignment_strategy.rb', line 35 def user_data Net::HTTP.start("169.254.169.254", 80) do |http| [ http.get("/latest/meta-data/instance-id").body, http.get("/latest/meta-data/instance-type").body, http.get("/latest/meta-data/placement/availability-zone").body, ].join(DELIMITER) end end |