Class: Kafkat::Command::SetReplicationFactor
- Defined in:
- lib/kafkat/command/set-replication-factor.rb
Overview
Command to set the replication factor (RF) of a topic. The command accepts the topic name, the new desired replication factor, and, in case of an increase of the replication factor, a list of broker ids.
When reducing the RF, a new partition assigment will be generated by removing the last partition replica of every replica set. The leader partition will not be removed from the replica set to prevent a leader election.
When increasing the RF, a new partition assignment will be generated by allocating a new replica to every replica set. The new replica will be assigned to the provided broker list in a round robin fashion. If no broker id is specified on the command line, all brokers will be used.
Instance Attribute Summary
Attributes inherited from Base
Instance Method Summary collapse
-
#increase_rf(topic, current_rf, new_rf, brokers) ⇒ Object
For every partition, filter out the brokers that already have a replica for this partition, then pick (new_rf - current_rf) brokers and assign them new replicas.
-
#reduce_rf(topic, current_rf, new_rf) ⇒ Object
For every partition, remove the last replica from the replica list.
- #run ⇒ Object
- #warn_reduce_brokers ⇒ Object
Methods inherited from Base
#admin, #initialize, #kafka_logs, register_as, usage, usages, #zookeeper
Methods included from Logging
Methods included from Kafkat::CommandIO
#prompt_and_execute_assignments
Methods included from Formatting
#justify, #print_assignment, #print_assignment_header, #print_broker, #print_broker_header, #print_partition, #print_partition_header, #print_topic, #print_topic_header, #print_topic_name
Constructor Details
This class inherits a constructor from Kafkat::Command::Base
Instance Method Details
#increase_rf(topic, current_rf, new_rf, brokers) ⇒ Object
For every partition, filter out the brokers that already have a replica for this partition, then pick (new_rf - current_rf) brokers and assign them new replicas.
The count of new replicas assigned to the brokers is maintained in order to uniformly assign new replicas.
135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 |
# File 'lib/kafkat/command/set-replication-factor.rb', line 135 def increase_rf(topic, current_rf, new_rf, brokers) unless new_rf > current_rf raise 'New replication factor must be greater than the current replication factor' end delta_rf = new_rf - current_rf if delta_rf > brokers.size raise "#{delta_rf} new replicas requested for topic #{p.topic_name} but only #{brokers.size} brokers available" end broker_counts = brokers.map { |b| {:id => b, :count => 0} } assignments = [] topic.partitions.map do |p| existing_replicas = p.replicas pick_from = broker_counts.reject { |b| existing_replicas.include?(b[:id]) } if delta_rf > pick_from.size raise "Cannot create #{delta_rf} new replicas for partition #{p.topic_name}.#{p.id}, not enough brokers" end new_replicas = pick_from.sort { |a, b| a[:count] <=> b[:count] }[0...delta_rf] new_replicas.each { |b| b[:count] += 1 } final_replicas = existing_replicas + new_replicas.map { |b| b[:id] } assignments << Assignment.new(topic.name, p.id, final_replicas) end assignments end |
#reduce_rf(topic, current_rf, new_rf) ⇒ Object
For every partition, remove the last replica from the replica list. If the last replica is the leader, then the previous replica is removed instead.
99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 |
# File 'lib/kafkat/command/set-replication-factor.rb', line 99 def reduce_rf(topic, current_rf, new_rf) delta_rf = current_rf - new_rf if current_rf == 1 raise 'Current replication factor if 1. Cannot reduce further.' end unless delta_rf > 0 raise "New replication factor (#{new_rf}) must be smaller than current replication factor (#{current_rf})" end assignments = [] topic.partitions.map do |p| new_replicas = p.replicas (0...delta_rf).each do |_| (0...new_replicas.size).each do |i| if new_replicas[new_replicas.size-1-i] != p.leader new_replicas.delete_at(new_replicas.size-1-i) break end end end if new_replicas.size != new_rf raise 'Unexpected state' end assignments << Assignment.new(topic.name, p.id, new_replicas) end assignments end |
#run ⇒ Object
25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 |
# File 'lib/kafkat/command/set-replication-factor.rb', line 25 def run topic_name = ARGV.shift unless ARGV[0] && ARGV[0].start_with?('--') all_brokers = zookeeper.get_brokers topics = topic_name && zookeeper.get_topics([topic_name]) topics ||= zookeeper.get_topics opts = Trollop. do opt :brokers, "the comma-separated list of broker the new partitions must be assigned to", type: :string opt :newrf, "the new replication factor", type: :integer, required: true end broker_ids = opts[:brokers] && opts[:brokers].split(',').map(&:to_i) new_rf = opts[:newrf] if new_rf < 1 puts "ERROR: replication factor is smaller than 1" exit 1 end broker_ids ||= zookeeper.get_brokers.values.map(&:id) all_brokers_id = all_brokers.values.map(&:id) broker_ids.each do |id| if !all_brokers_id.include?(id) puts "ERROR: Broker #{id} is not currently active.\n" exit 1 end end broker_count = broker_ids.size if new_rf > broker_count puts "ERROR: Replication factor is larger than number of brokers.\n" exit 1 end assignments = [] topics.each do |_, t| current_rf = t.partitions[0].replicas.size if new_rf < current_rf warn_reduce_brokers if opts[:brokers] assignments += reduce_rf(t, current_rf, new_rf) elsif new_rf > current_rf assignments += increase_rf(t, current_rf, new_rf, broker_ids) end end # **************** if assignments.empty? puts "No partition reassignment required" else print "This operation executes the following assignments:\n\n" print_assignment_header assignments.each { |a| print_assignment(a) } print "\n" return unless agree("Proceed (y/n)?") result = nil begin print "\nBeginning.\n" result = admin.reassign!(assignments) print "Started.\n" rescue Admin::ExecutionFailedError print result end end end |
#warn_reduce_brokers ⇒ Object
164 165 166 167 168 169 |
# File 'lib/kafkat/command/set-replication-factor.rb', line 164 def warn_reduce_brokers return if @did_warn_reduce_brokers puts "When reducing the replication factor the list of specified brokers is ignored." puts "Once the replication factor is set, you can use the reassign command." @did_warn_reduce_brokers = true end |