Class: Kafkat::Command::Drain
Instance Attribute Summary
Attributes inherited from Base
Instance Method Summary collapse
-
#build_partitions_by_broker(topic, destination_brokers) ⇒ Object
Build a hash map from broker id to number of partitions on it to facilitate finding the broker with lowest number of partitions to help balance brokers.
- #generate_assignments(source_broker, topics, destination_brokers) ⇒ Object
-
#run ⇒ Object
For each partition (of specified topic) on the source broker, the command is to assign the partition to one of the destination brokers that does not already have this partition, along with existing brokers to achieve minimal movement of data.
Methods inherited from Base
#admin, #initialize, #kafka_logs, register_as, usage, usages, #zookeeper
Methods included from Logging
Methods included from Kafkat::CommandIO
#prompt_and_execute_assignments
Methods included from Formatting
#justify, #print_assignment, #print_assignment_header, #print_broker, #print_broker_header, #print_partition, #print_partition_header, #print_topic, #print_topic_header, #print_topic_name
Constructor Details
This class inherits a constructor from Kafkat::Command::Base
Instance Method Details
#build_partitions_by_broker(topic, destination_brokers) ⇒ Object
Build a hash map from broker id to number of partitions on it to facilitate finding the broker with lowest number of partitions to help balance brokers.
97 98 99 100 101 102 103 104 105 106 |
# File 'lib/kafkat/command/drain.rb', line 97 def build_partitions_by_broker(topic, destination_brokers) partitions_by_broker = Hash.new(0) destination_brokers.each { |id| partitions_by_broker[id] = 0 } topic.partitions.each do |p| p.replicas.each do |r| partitions_by_broker[r] += 1 end end partitions_by_broker end |
#generate_assignments(source_broker, topics, destination_brokers) ⇒ Object
55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 |
# File 'lib/kafkat/command/drain.rb', line 55 def generate_assignments(source_broker, topics, destination_brokers) assignments = [] topics.each do |_, t| partitions_by_broker = build_partitions_by_broker(t, destination_brokers) t.partitions.each do |p| if p.replicas.include? source_broker replicas_size = p.replicas.length replicas = p.replicas - [source_broker] source_broker_is_leader = p.replicas.first == source_broker potential_broker_ids = destination_brokers - replicas if potential_broker_ids.empty? print "ERROR: Not enough destination brokers to reassign topic \"#{t.name}\".\n" exit 1 end num_partitions_on_potential_broker = partitions_by_broker.select { |id, _| potential_broker_ids.include? id } assigned_broker_id = num_partitions_on_potential_broker.min_by{ |id, num| num }[0] if source_broker_is_leader replicas.unshift(assigned_broker_id) else replicas << assigned_broker_id end partitions_by_broker[assigned_broker_id] += 1 if replicas.length != replicas_size STDERR.print "ERROR: Number of replicas changes after reassignment topic: #{t.name}, partition: #{p.id} \n" exit 1 end assignments << Assignment.new(t.name, p.id, replicas) end end end assignments end |
#run ⇒ Object
For each partition (of specified topic) on the source broker, the command is to assign the partition to one of the destination brokers that does not already have this partition, along with existing brokers to achieve minimal movement of data. To help distribute data evenly, if there are more than one destination brokers meet the requirement, the command will always choose the brokers with the lowest number of partitions of the involving topic.
In order to find out the broker with lowest number of partitions, the command maintain a hash table with broker id as key and number of partitions as value. The hash table will be updated along with assignment.
20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 |
# File 'lib/kafkat/command/drain.rb', line 20 def run source_broker = ARGV[0] && ARGV.shift.to_i if source_broker.nil? puts "You must specify a broker ID." exit 1 end opts = Trollop. do opt :brokers, "destination broker IDs", type: :string opt :topic, "topic name to reassign", type: :string end topic_name = opts[:topic] topics = topic_name && zookeeper.get_topics([topic_name]) topics ||= zookeeper.get_topics destination_brokers = opts[:brokers] && opts[:brokers].split(',').map(&:to_i) destination_brokers ||= zookeeper.get_brokers.values.map(&:id) destination_brokers.delete(source_broker) active_brokers = zookeeper.get_brokers.values.map(&:id) unless (inactive_brokers = destination_brokers - active_brokers).empty? print "ERROR: Broker #{inactive_brokers} are not currently active.\n" exit 1 end assignments = generate_assignments(source_broker, topics, destination_brokers) print "Num of topics got from zookeeper: #{topics.length}\n" print "Num of partitions in the assignment: #{assignments.size}\n" prompt_and_execute_assignments(assignments) end |