Class: Kafka::EC2::MixedInstanceAssignmentStrategy

Inherits:
Object
  • Object
show all
Defined in:
lib/kafka/ec2/mixed_instance_assignment_strategy.rb

Constant Summary collapse

DELIMITER =
","

Instance Method Summary collapse

Constructor Details

#initialize(instance_family_weights: {}, availability_zone_weights: {}, weights: {}, partition_weights: {}) ⇒ MixedInstanceAssignmentStrategy

Returns a new instance of MixedInstanceAssignmentStrategy.

Parameters:

  • instance_family_weights (Hash{String => Numeric}, Proc) (defaults to: {})

    a hash whose the key is the instance family and whose value is the weight. If the object is a proc, it must returns such a hash and the proc is called every time the method “assign” is called.

  • availability_zone_weights (Hash{String => Numeric}, Proc) (defaults to: {})

    a hash whose the key is the availability zone and whose value is the weight. If the object is a proc, it must returns such a hash and the proc is called every time the method “assign” is called.

  • weights (Hash{String => Hash{String => Numeric}}, Proc) (defaults to: {})

    a hash whose the key is the availability zone or the instance family and whose value is the hash like instance_family_weights or availability_zone_weights. If the object is a proc, it must returns such a hash and the proc is called every time the method “assign” is called.



24
25
26
27
28
29
# File 'lib/kafka/ec2/mixed_instance_assignment_strategy.rb', line 24

def initialize(instance_family_weights: {}, availability_zone_weights: {}, weights: {}, partition_weights: {})
  @instance_family_weights = instance_family_weights
  @availability_zone_weights = availability_zone_weights
  @weights = weights
  @partition_weights = partition_weights
end

Instance Method Details

#call(cluster:, members:, partitions:) ⇒ Hash{String => Protocol::MemberAssignment}

Assign the topic partitions to the group members.

Parameters:

  • members (Array<String>)

    member ids

  • topics (Array<String>)

    topics

Returns:

  • (Hash{String => Protocol::MemberAssignment})

    a hash mapping member ids to assignments.



51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
# File 'lib/kafka/ec2/mixed_instance_assignment_strategy.rb', line 51

def call(cluster:, members:, partitions:)
  member_id_to_partitions = Hash.new { |h, k| h[k] = [] }
  instance_id_to_capacity = Hash.new(0)
  instance_id_to_member_ids = Hash.new { |h, k| h[k] = [] }
  total_capacity = 0
  member_id_to_instance_id = {}

  instance_family_to_capacity = @instance_family_weights.is_a?(Proc) ? @instance_family_weights.call() : @instance_family_weights
  az_to_capacity = @availability_zone_weights.is_a?(Proc) ? @availability_zone_weights.call() : @availability_zone_weights
  weights = @weights.is_a?(Proc) ? @weights.call() : @weights
  members.each do |member_id, |
    instance_id, instance_type, az = .user_data.split(DELIMITER)
    instance_id_to_member_ids[instance_id] << member_id
    member_id_to_instance_id[member_id] = instance_id
    capacity = calculate_capacity(instance_type, az, instance_family_to_capacity, az_to_capacity, weights)
    instance_id_to_capacity[instance_id] += capacity
    total_capacity += capacity
  end

  partition_weights = build_partition_weights(partitions)
  partition_weight_per_capacity = partitions.sum { |partition| partition_weights.dig(partition.topic, partition.partition_id) } / total_capacity

  last_index = 0
  member_id_to_acceptable_partition_weight = {}
  instance_id_to_total_acceptable_partition_weight = Hash.new(0)
  instance_id_to_capacity.each do |instance_id, capacity|
    member_ids = instance_id_to_member_ids[instance_id]
    member_ids.each do |member_id|
      acceptable_partition_weight = capacity * partition_weight_per_capacity / member_ids.size
      while last_index < partitions.size
        partition = partitions[last_index]
        partition_weight = partition_weights.dig(partition.topic, partition.partition_id)
        break if acceptable_partition_weight - partition_weight < 0

        member_id_to_partitions[member_id] << partition
        acceptable_partition_weight -= partition_weight

        last_index += 1
      end

      member_id_to_acceptable_partition_weight[member_id] = acceptable_partition_weight
      instance_id_to_total_acceptable_partition_weight[instance_id] += acceptable_partition_weight
    end
  end

  while last_index < partitions.size
    max_acceptable_partition_weight = member_id_to_acceptable_partition_weight.values.max
    member_ids = member_id_to_acceptable_partition_weight.select { |_, w| w == max_acceptable_partition_weight }.keys
    if member_ids.size == 1
      member_id = member_ids.first
    else
      member_id =  member_ids.max_by { |id| instance_id_to_total_acceptable_partition_weight[member_id_to_instance_id[id]] }
    end
    partition = partitions[last_index]
    member_id_to_partitions[member_id] << partition

    partition_weight = partition_weights.dig(partition.topic, partition.partition_id)
    member_id_to_acceptable_partition_weight[member_id] -= partition_weight
    instance_id_to_total_acceptable_partition_weight[member_id_to_instance_id[member_id]] -= partition_weight

    last_index += 1
  end

  member_id_to_partitions
rescue Kafka::LeaderNotAvailable
  sleep 1
  retry
end

#protocol_nameObject



31
32
33
# File 'lib/kafka/ec2/mixed_instance_assignment_strategy.rb', line 31

def protocol_name
  "mixedinstance"
end

#user_dataObject



35
36
37
38
39
40
41
42
43
# File 'lib/kafka/ec2/mixed_instance_assignment_strategy.rb', line 35

def user_data
  Net::HTTP.start("169.254.169.254", 80) do |http|
    [
      http.get("/latest/meta-data/instance-id").body,
      http.get("/latest/meta-data/instance-type").body,
      http.get("/latest/meta-data/placement/availability-zone").body,
    ].join(DELIMITER)
  end
end