Class: Karafka::Admin::Replication
- Inherits:
-
Karafka::Admin
- Object
- Karafka::Admin
- Karafka::Admin::Replication
- Defined in:
- lib/karafka/admin/replication.rb
Overview
This class only generates plans - actual execution requires Kafka’s Java tools
Always verify broker capacity before increasing replication
Replication administration operations helper
Generates partition reassignment plans for increasing topic replication factor. Since librdkafka does not support changing replication factors directly, this class generates the necessary JSON configuration that can be executed using Kafka’s Java-based reassignment tools.
## Important Considerations
Replication factor changes are among the most resource-intensive operations in Kafka.
## Prerequisites
-
**Sufficient Disk Space**: Ensure target brokers have enough space for new replicas
-
**Network Capacity**: Verify network can handle additional replication traffic
-
**Broker Count**: Cannot exceed the number of available brokers
-
**Java Tools**: Kafka’s reassignment tools must be available
## Best Practices
-
**Test First**: Always test on small topics or in staging environments
-
**Monitor Resources**: Watch disk space, network, and CPU during replication
-
**Incremental Changes**: Increase replication factor by 1 at a time for large topics
-
**Off-Peak Hours**: Execute during low-traffic periods to minimize impact
Instance Attribute Summary collapse
-
#current_replication_factor ⇒ Object
readonly
Returns the value of attribute current_replication_factor.
-
#execution_commands ⇒ Object
readonly
Returns the value of attribute execution_commands.
-
#partitions_assignment ⇒ Object
readonly
Returns the value of attribute partitions_assignment.
-
#reassignment_json ⇒ Object
readonly
Returns the value of attribute reassignment_json.
-
#steps ⇒ Object
readonly
Returns the value of attribute steps.
-
#target_replication_factor ⇒ Object
readonly
Returns the value of attribute target_replication_factor.
-
#topic ⇒ Object
readonly
Returns the value of attribute topic.
Attributes inherited from Karafka::Admin
Class Method Summary collapse
Instance Method Summary collapse
-
#export_to_file(file_path) ⇒ Object
Export the reassignment JSON to a file.
-
#initialize(kafka: nil, topic: nil, current_replication_factor: nil, target_replication_factor: nil, partitions_assignment: nil, cluster_info: nil) ⇒ Replication
constructor
Builds the replication plan or creates an admin operations instance.
-
#plan(topic:, to:, brokers: nil) ⇒ Replication
Plans replication factor increase for a given topic.
-
#rebalance(topic:) ⇒ Replication
Plans rebalancing of existing replicas across brokers.
-
#summary ⇒ String
Human-readable summary of the plan.
Methods inherited from Karafka::Admin
cluster_info, #cluster_info, copy_consumer_group, #copy_consumer_group, create_partitions, #create_partitions, create_topic, #create_topic, #delete_consumer_group, delete_consumer_group, delete_topic, #delete_topic, #plan_topic_replication, plan_topic_replication, read_lags_with_offsets, #read_lags_with_offsets, read_topic, #read_topic, #read_watermark_offsets, read_watermark_offsets, #rename_consumer_group, rename_consumer_group, seek_consumer_group, #seek_consumer_group, #topic_info, topic_info, trigger_rebalance, #trigger_rebalance, with_admin, #with_admin, with_consumer, #with_consumer
Constructor Details
#initialize(kafka: nil, topic: nil, current_replication_factor: nil, target_replication_factor: nil, partitions_assignment: nil, cluster_info: nil) ⇒ Replication
Builds the replication plan or creates an admin operations instance
This class serves dual purposes:
-
As an admin operations instance when called with only kafka: parameter
-
As a plan result object when called with topic and plan parameters
91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 |
# File 'lib/karafka/admin/replication.rb', line 91 def initialize( kafka: nil, topic: nil, current_replication_factor: nil, target_replication_factor: nil, partitions_assignment: nil, cluster_info: nil ) super(kafka: kafka || {}) # If topic is provided, this is a plan result object return unless topic @topic = topic @current_replication_factor = current_replication_factor @target_replication_factor = target_replication_factor @partitions_assignment = partitions_assignment @cluster_info = cluster_info generate_reassignment_json generate_execution_commands generate_steps freeze end |
Instance Attribute Details
#current_replication_factor ⇒ Object (readonly)
Returns the value of attribute current_replication_factor.
52 53 54 |
# File 'lib/karafka/admin/replication.rb', line 52 def current_replication_factor @current_replication_factor end |
#execution_commands ⇒ Object (readonly)
Returns the value of attribute execution_commands.
52 53 54 |
# File 'lib/karafka/admin/replication.rb', line 52 def execution_commands @execution_commands end |
#partitions_assignment ⇒ Object (readonly)
Returns the value of attribute partitions_assignment.
52 53 54 |
# File 'lib/karafka/admin/replication.rb', line 52 def partitions_assignment @partitions_assignment end |
#reassignment_json ⇒ Object (readonly)
Returns the value of attribute reassignment_json.
52 53 54 |
# File 'lib/karafka/admin/replication.rb', line 52 def reassignment_json @reassignment_json end |
#steps ⇒ Object (readonly)
Returns the value of attribute steps.
52 53 54 |
# File 'lib/karafka/admin/replication.rb', line 52 def steps @steps end |
#target_replication_factor ⇒ Object (readonly)
Returns the value of attribute target_replication_factor.
52 53 54 |
# File 'lib/karafka/admin/replication.rb', line 52 def target_replication_factor @target_replication_factor end |
#topic ⇒ Object (readonly)
Returns the value of attribute topic.
52 53 54 |
# File 'lib/karafka/admin/replication.rb', line 52 def topic @topic end |
Class Method Details
.plan(topic:, to:, brokers: nil) ⇒ Object
68 69 70 |
# File 'lib/karafka/admin/replication.rb', line 68 def plan(topic:, to:, brokers: nil) new.plan(topic: topic, to: to, brokers: brokers) end |
.rebalance(topic:) ⇒ Object
74 75 76 |
# File 'lib/karafka/admin/replication.rb', line 74 def rebalance(topic:) new.rebalance(topic: topic) end |
Instance Method Details
#export_to_file(file_path) ⇒ Object
Export the reassignment JSON to a file
254 255 256 257 |
# File 'lib/karafka/admin/replication.rb', line 254 def export_to_file(file_path) File.write(file_path, @reassignment_json) file_path end |
#plan(topic:, to:, brokers: nil) ⇒ Replication
When using manual placement, ensure all partitions are specified
Manual placement overrides automatic distribution entirely
Plans replication factor increase for a given topic
Generates a detailed reassignment plan that preserves existing replica assignments while adding new replicas to meet the target replication factor. The plan uses round-robin distribution to balance new replicas across available brokers.
167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 |
# File 'lib/karafka/admin/replication.rb', line 167 def plan(topic:, to:, brokers: nil) topic_info = fetch_topic_info(topic) first_partition = topic_info[:partitions].first current_rf = first_partition[:replica_count] || first_partition[:replicas]&.size fetched_cluster_info = fetch_cluster_info # Use contract for validation validation_data = { topic: topic, to: to, brokers: brokers, current_rf: current_rf, broker_count: fetched_cluster_info[:brokers].size, topic_info: topic_info, cluster_info: fetched_cluster_info } Contracts::Replication.new.validate!(validation_data) partitions_assignment = brokers || generate_partitions_assignment( topic_info: topic_info, target_replication_factor: to, cluster_info: fetched_cluster_info ) self.class.new( kafka: @custom_kafka, topic: topic, current_replication_factor: current_rf, target_replication_factor: to, partitions_assignment: partitions_assignment, cluster_info: fetched_cluster_info ) end |
#rebalance(topic:) ⇒ Replication
This maintains the same replication factor
All data will be copied to new locations during rebalancing
Consider impact on cluster resources during rebalancing
Plans rebalancing of existing replicas across brokers
Generates a reassignment plan that redistributes existing replicas more evenly across the cluster without changing the replication factor. Useful for:
-
Balancing load after adding new brokers to the cluster
-
Redistributing replicas after broker failures and recovery
-
Optimizing replica placement for better resource utilization
-
Moving replicas away from overloaded brokers
229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 |
# File 'lib/karafka/admin/replication.rb', line 229 def rebalance(topic:) topic_info = fetch_topic_info(topic) first_partition = topic_info[:partitions].first current_rf = first_partition[:replica_count] || first_partition[:replicas]&.size fetched_cluster_info = fetch_cluster_info partitions_assignment = generate_partitions_assignment( topic_info: topic_info, target_replication_factor: current_rf, cluster_info: fetched_cluster_info, rebalance_only: true ) self.class.new( kafka: @custom_kafka, topic: topic, current_replication_factor: current_rf, target_replication_factor: current_rf, partitions_assignment: partitions_assignment, cluster_info: fetched_cluster_info ) end |
#summary ⇒ String
Returns human-readable summary of the plan.
260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 |
# File 'lib/karafka/admin/replication.rb', line 260 def summary broker_count = @cluster_info[:brokers].size change = @target_replication_factor - @current_replication_factor broker_nodes = @cluster_info[:brokers].map do |broker_info| broker_info[:node_id] end.join(", ") " Replication Increase Plan for Topic: \#{@topic}\n =====================================\n Current replication factor: \#{@current_replication_factor}\n Target replication factor: \#{@target_replication_factor}\n Total partitions: \#{@partitions_assignment.size}\n Available brokers: \#{broker_count} (\#{broker_nodes})\n\n This plan will increase replication by adding \#{change} replica(s) to each partition.\n SUMMARY\nend\n" |