Class: Kafkat::Command::SetReplicationFactor

Inherits:
Base
  • Object
show all
Defined in:
lib/kafkat/command/set-replication-factor.rb

Overview

Command to set the replication factor (RF) of a topic. The command accepts the topic name, the new desired replication factor, and, in case of an increase of the replication factor, a list of broker ids.

When reducing the RF, a new partition assigment will be generated by removing the last partition replica of every replica set. The leader partition will not be removed from the replica set to prevent a leader election.

When increasing the RF, a new partition assignment will be generated by allocating a new replica to every replica set. The new replica will be assigned to the provided broker list in a round robin fashion. If no broker id is specified on the command line, all brokers will be used.

Instance Attribute Summary

Attributes inherited from Base

#config

Instance Method Summary collapse

Methods inherited from Base

#admin, #initialize, #kafka_logs, register_as, usage, usages, #zookeeper

Methods included from Logging

#print_err

Methods included from Kafkat::CommandIO

#prompt_and_execute_assignments

Methods included from Formatting

#justify, #print_assignment, #print_assignment_header, #print_broker, #print_broker_header, #print_partition, #print_partition_header, #print_topic, #print_topic_header, #print_topic_name

Constructor Details

This class inherits a constructor from Kafkat::Command::Base

Instance Method Details

#increase_rf(topic, current_rf, new_rf, brokers) ⇒ Object

For every partition, filter out the brokers that already have a replica for this partition, then pick (new_rf - current_rf) brokers and assign them new replicas.

The count of new replicas assigned to the brokers is maintained in order to uniformly assign new replicas.



135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
# File 'lib/kafkat/command/set-replication-factor.rb', line 135

def increase_rf(topic, current_rf, new_rf, brokers)
  unless new_rf > current_rf
    raise 'New replication factor must be greater than the current replication factor'
  end

  delta_rf = new_rf - current_rf
  if delta_rf > brokers.size
    raise "#{delta_rf} new replicas requested for topic #{p.topic_name} but only #{brokers.size} brokers available"
  end

  broker_counts = brokers.map { |b| {:id => b, :count => 0} }

  assignments = []
  topic.partitions.map do |p|
    existing_replicas = p.replicas
    pick_from = broker_counts.reject { |b| existing_replicas.include?(b[:id]) }
    if delta_rf > pick_from.size
      raise "Cannot create #{delta_rf} new replicas for partition #{p.topic_name}.#{p.id}, not enough brokers"
    end
    new_replicas = pick_from.sort { |a, b| a[:count] <=> b[:count] }[0...delta_rf]
    new_replicas.each { |b| b[:count] += 1 }

    final_replicas = existing_replicas + new_replicas.map { |b| b[:id] }

    assignments << Assignment.new(topic.name, p.id, final_replicas)
  end
  assignments
end

#reduce_rf(topic, current_rf, new_rf) ⇒ Object

For every partition, remove the last replica from the replica list. If the last replica is the leader, then the previous replica is removed instead.



99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
# File 'lib/kafkat/command/set-replication-factor.rb', line 99

def reduce_rf(topic, current_rf, new_rf)
  delta_rf = current_rf - new_rf
  if current_rf == 1
    raise 'Current replication factor if 1. Cannot reduce further.'
  end
  unless delta_rf > 0
    raise "New replication factor (#{new_rf}) must be smaller than current replication factor (#{current_rf})"
  end
  assignments = []
  topic.partitions.map do |p|
    new_replicas = p.replicas

    (0...delta_rf).each do |_|
      (0...new_replicas.size).each do |i|
        if new_replicas[new_replicas.size-1-i] != p.leader
          new_replicas.delete_at(new_replicas.size-1-i)
          break
        end
      end
    end

    if new_replicas.size != new_rf
      raise 'Unexpected state'
    end
    assignments << Assignment.new(topic.name, p.id, new_replicas)
  end
  assignments
end

#runObject



25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
# File 'lib/kafkat/command/set-replication-factor.rb', line 25

def run
  topic_name = ARGV.shift unless ARGV[0] && ARGV[0].start_with?('--')

  all_brokers = zookeeper.get_brokers
  topics = topic_name && zookeeper.get_topics([topic_name])
  topics ||= zookeeper.get_topics

  opts = Trollop.options do
    opt :brokers, "the comma-separated list of broker the new partitions must be assigned to", type: :string
    opt :newrf, "the new replication factor", type: :integer, required: true
  end

  broker_ids = opts[:brokers] && opts[:brokers].split(',').map(&:to_i)
  new_rf = opts[:newrf]

  if new_rf < 1
    puts "ERROR: replication factor is smaller than 1"
    exit 1
  end

  broker_ids ||= zookeeper.get_brokers.values.map(&:id)

  all_brokers_id = all_brokers.values.map(&:id)
  broker_ids.each do |id|
    if !all_brokers_id.include?(id)
      puts "ERROR: Broker #{id} is not currently active.\n"
      exit 1
    end
  end

  broker_count = broker_ids.size
  if new_rf > broker_count
    puts "ERROR: Replication factor is larger than number of brokers.\n"
    exit 1
  end

  assignments = []
  topics.each do |_, t|
    current_rf = t.partitions[0].replicas.size
    if new_rf < current_rf
      warn_reduce_brokers if opts[:brokers]
      assignments += reduce_rf(t, current_rf, new_rf)
    elsif new_rf > current_rf
      assignments += increase_rf(t, current_rf, new_rf, broker_ids)
    end
  end

  # ****************
  if assignments.empty?
    puts "No partition reassignment required"
  else
    print "This operation executes the following assignments:\n\n"
    print_assignment_header
    assignments.each { |a| print_assignment(a) }
    print "\n"

    return unless agree("Proceed (y/n)?")

    result = nil
    begin
      print "\nBeginning.\n"
      result = admin.reassign!(assignments)
      print "Started.\n"
    rescue Admin::ExecutionFailedError
      print result
    end
  end
end

#warn_reduce_brokersObject



164
165
166
167
168
169
# File 'lib/kafkat/command/set-replication-factor.rb', line 164

def warn_reduce_brokers
  return if @did_warn_reduce_brokers
  puts "When reducing the replication factor the list of specified brokers is ignored."
  puts "Once the replication factor is set, you can use the reassign command."
  @did_warn_reduce_brokers = true
end