Class: Karafka::Admin::Replication

Inherits:
Karafka::Admin show all
Defined in:
lib/karafka/admin/replication.rb

Overview

Note:

This class only generates plans - actual execution requires Kafka’s Java tools

Note:

Always verify broker capacity before increasing replication

Replication administration operations helper

Generates partition reassignment plans for increasing topic replication factor. Since librdkafka does not support changing replication factors directly, this class generates the necessary JSON configuration that can be executed using Kafka’s Java-based reassignment tools.

## Important Considerations

Replication factor changes are among the most resource-intensive operations in Kafka.

## Prerequisites

  1. **Sufficient Disk Space**: Ensure target brokers have enough space for new replicas

  2. **Network Capacity**: Verify network can handle additional replication traffic

  3. **Broker Count**: Cannot exceed the number of available brokers

  4. **Java Tools**: Kafka’s reassignment tools must be available

## Best Practices

  • **Test First**: Always test on small topics or in staging environments

  • **Monitor Resources**: Watch disk space, network, and CPU during replication

  • **Incremental Changes**: Increase replication factor by 1 at a time for large topics

  • **Off-Peak Hours**: Execute during low-traffic periods to minimize impact

Examples:

Basic usage - increase replication factor

# Generate plan to increase replication from 2 to 3
plan = Karafka::Admin::Replication.plan(topic: 'events', to: 3)

# Review what will happen
puts plan.summary

# Export for execution
plan.export_to_file('/tmp/increase_replication.json')

# Execute with Kafka tools (outside of Ruby)
# kafka-reassign-partitions.sh --bootstrap-server localhost:9092  #   --reassignment-json-file /tmp/increase_replication.json --execute

Rebalancing replicas across brokers

# Rebalance existing replicas without changing replication factor
plan = Karafka::Admin::Replication.rebalance(topic: 'events')
plan.export_to_file('/tmp/rebalance.json')

Instance Attribute Summary collapse

Attributes inherited from Karafka::Admin

#custom_kafka

Class Method Summary collapse

Instance Method Summary collapse

Methods inherited from Karafka::Admin

cluster_info, #cluster_info, copy_consumer_group, #copy_consumer_group, create_partitions, #create_partitions, create_topic, #create_topic, #delete_consumer_group, delete_consumer_group, delete_topic, #delete_topic, #plan_topic_replication, plan_topic_replication, read_lags_with_offsets, #read_lags_with_offsets, read_topic, #read_topic, #read_watermark_offsets, read_watermark_offsets, #rename_consumer_group, rename_consumer_group, seek_consumer_group, #seek_consumer_group, #topic_info, topic_info, trigger_rebalance, #trigger_rebalance, with_admin, #with_admin, with_consumer, #with_consumer

Constructor Details

#initialize(kafka: nil, topic: nil, current_replication_factor: nil, target_replication_factor: nil, partitions_assignment: nil, cluster_info: nil) ⇒ Replication

Builds the replication plan or creates an admin operations instance

This class serves dual purposes:

  1. As an admin operations instance when called with only kafka: parameter

  2. As a plan result object when called with topic and plan parameters

Parameters:

  • (defaults to: nil)

    custom kafka configuration for admin operations (optional)

  • (defaults to: nil)

    topic name (for plan result)

  • (defaults to: nil)

    current replication factor (for plan result)

  • (defaults to: nil)

    target replication factor (for plan result)

  • (defaults to: nil)

    partition to brokers assignment (for plan result)

  • (defaults to: nil)

    broker information (for plan result)



91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
# File 'lib/karafka/admin/replication.rb', line 91

def initialize(
  kafka: nil,
  topic: nil,
  current_replication_factor: nil,
  target_replication_factor: nil,
  partitions_assignment: nil,
  cluster_info: nil
)
  super(kafka: kafka || {})

  # If topic is provided, this is a plan result object
  return unless topic

  @topic = topic
  @current_replication_factor = current_replication_factor
  @target_replication_factor = target_replication_factor
  @partitions_assignment = partitions_assignment
  @cluster_info = cluster_info

  generate_reassignment_json
  generate_execution_commands
  generate_steps

  freeze
end

Instance Attribute Details

#current_replication_factorObject (readonly)

Returns the value of attribute current_replication_factor.



52
53
54
# File 'lib/karafka/admin/replication.rb', line 52

def current_replication_factor
  @current_replication_factor
end

#execution_commandsObject (readonly)

Returns the value of attribute execution_commands.



52
53
54
# File 'lib/karafka/admin/replication.rb', line 52

def execution_commands
  @execution_commands
end

#partitions_assignmentObject (readonly)

Returns the value of attribute partitions_assignment.



52
53
54
# File 'lib/karafka/admin/replication.rb', line 52

def partitions_assignment
  @partitions_assignment
end

#reassignment_jsonObject (readonly)

Returns the value of attribute reassignment_json.



52
53
54
# File 'lib/karafka/admin/replication.rb', line 52

def reassignment_json
  @reassignment_json
end

#stepsObject (readonly)

Returns the value of attribute steps.



52
53
54
# File 'lib/karafka/admin/replication.rb', line 52

def steps
  @steps
end

#target_replication_factorObject (readonly)

Returns the value of attribute target_replication_factor.



52
53
54
# File 'lib/karafka/admin/replication.rb', line 52

def target_replication_factor
  @target_replication_factor
end

#topicObject (readonly)

Returns the value of attribute topic.



52
53
54
# File 'lib/karafka/admin/replication.rb', line 52

def topic
  @topic
end

Class Method Details

.plan(topic:, to:, brokers: nil) ⇒ Object

See Also:

Parameters:

  • topic name to plan replication for

  • target replication factor

  • (defaults to: nil)

    optional manual broker assignments per partition



68
69
70
# File 'lib/karafka/admin/replication.rb', line 68

def plan(topic:, to:, brokers: nil)
  new.plan(topic: topic, to: to, brokers: brokers)
end

.rebalance(topic:) ⇒ Object

See Also:

Parameters:

  • topic name to rebalance



74
75
76
# File 'lib/karafka/admin/replication.rb', line 74

def rebalance(topic:)
  new.rebalance(topic: topic)
end

Instance Method Details

#export_to_file(file_path) ⇒ Object

Export the reassignment JSON to a file

Parameters:

  • path where to save the JSON file



254
255
256
257
# File 'lib/karafka/admin/replication.rb', line 254

def export_to_file(file_path)
  File.write(file_path, @reassignment_json)
  file_path
end

#plan(topic:, to:, brokers: nil) ⇒ Replication

Note:

When using manual placement, ensure all partitions are specified

Note:

Manual placement overrides automatic distribution entirely

Plans replication factor increase for a given topic

Generates a detailed reassignment plan that preserves existing replica assignments while adding new replicas to meet the target replication factor. The plan uses round-robin distribution to balance new replicas across available brokers.

Examples:

Increase replication from 1 to 3 with automatic distribution

plan = Replication.plan(topic: 'events', to: 3)

# Inspect the plan
puts plan.summary
puts plan.reassignment_json

# Check which brokers will get new replicas
plan.partitions_assignment.each do |partition_id, broker_ids|
  puts "Partition #{partition_id}: #{broker_ids.join(', ')}"
end

# Save and execute
plan.export_to_file('increase_rf.json')

Increase replication with manual broker placement

# Specify exactly which brokers should host each partition
plan = Replication.plan(
  topic: 'events',
  to: 3,
  brokers: {
    0 => [1, 2, 4],  # Partition 0 on brokers 1, 2, 4
    1 => [2, 3, 4],  # Partition 1 on brokers 2, 3, 4
    2 => [1, 3, 5]   # Partition 2 on brokers 1, 3, 5
  }
)

# The plan will use your exact broker specifications
puts plan.partitions_assignment
# => {0=>[1, 2, 4], 1=>[2, 3, 4], 2=>[1, 3, 5]}

Raises:

  • if target replication factor is not higher than current

  • if target replication factor exceeds available broker count

  • if topic metadata cannot be fetched

Parameters:

  • name of the topic

  • target replication factor (must be higher than current)

  • (defaults to: nil)

    optional manual broker assignments per partition. Keys are partition IDs, values are arrays of broker IDs. If not provided automatic distribution (usually fine) will be used

Returns:

  • plan object containing JSON, commands, and instructions



167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
# File 'lib/karafka/admin/replication.rb', line 167

def plan(topic:, to:, brokers: nil)
  topic_info = fetch_topic_info(topic)
  first_partition = topic_info[:partitions].first
  current_rf = first_partition[:replica_count] || first_partition[:replicas]&.size
  fetched_cluster_info = fetch_cluster_info

  # Use contract for validation
  validation_data = {
    topic: topic,
    to: to,
    brokers: brokers,
    current_rf: current_rf,
    broker_count: fetched_cluster_info[:brokers].size,
    topic_info: topic_info,
    cluster_info: fetched_cluster_info
  }

  Contracts::Replication.new.validate!(validation_data)

  partitions_assignment = brokers || generate_partitions_assignment(
    topic_info: topic_info,
    target_replication_factor: to,
    cluster_info: fetched_cluster_info
  )

  self.class.new(
    kafka: @custom_kafka,
    topic: topic,
    current_replication_factor: current_rf,
    target_replication_factor: to,
    partitions_assignment: partitions_assignment,
    cluster_info: fetched_cluster_info
  )
end

#rebalance(topic:) ⇒ Replication

Note:

This maintains the same replication factor

Note:

All data will be copied to new locations during rebalancing

Note:

Consider impact on cluster resources during rebalancing

Plans rebalancing of existing replicas across brokers

Generates a reassignment plan that redistributes existing replicas more evenly across the cluster without changing the replication factor. Useful for:

  • Balancing load after adding new brokers to the cluster

  • Redistributing replicas after broker failures and recovery

  • Optimizing replica placement for better resource utilization

  • Moving replicas away from overloaded brokers

Examples:

Rebalance after adding new brokers

# After adding brokers 4 and 5 to a 3-broker cluster
plan = Replication.rebalance(topic: 'events')

# Review how replicas will be redistributed
puts plan.summary

# Execute if distribution looks good
plan.export_to_file('rebalance.json')
# Then run: kafka-reassign-partitions.sh --execute ...

Parameters:

  • name of the topic to rebalance

Returns:

  • rebalancing plan



229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
# File 'lib/karafka/admin/replication.rb', line 229

def rebalance(topic:)
  topic_info = fetch_topic_info(topic)
  first_partition = topic_info[:partitions].first
  current_rf = first_partition[:replica_count] || first_partition[:replicas]&.size
  fetched_cluster_info = fetch_cluster_info

  partitions_assignment = generate_partitions_assignment(
    topic_info: topic_info,
    target_replication_factor: current_rf,
    cluster_info: fetched_cluster_info,
    rebalance_only: true
  )

  self.class.new(
    kafka: @custom_kafka,
    topic: topic,
    current_replication_factor: current_rf,
    target_replication_factor: current_rf,
    partitions_assignment: partitions_assignment,
    cluster_info: fetched_cluster_info
  )
end

#summaryString

Returns human-readable summary of the plan.

Returns:

  • human-readable summary of the plan



260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
# File 'lib/karafka/admin/replication.rb', line 260

def summary
  broker_count = @cluster_info[:brokers].size
  change = @target_replication_factor - @current_replication_factor
  broker_nodes = @cluster_info[:brokers].map do |broker_info|
    broker_info[:node_id]
  end.join(", ")

  "    Replication Increase Plan for Topic: \#{@topic}\n    =====================================\n    Current replication factor: \#{@current_replication_factor}\n    Target replication factor: \#{@target_replication_factor}\n    Total partitions: \#{@partitions_assignment.size}\n    Available brokers: \#{broker_count} (\#{broker_nodes})\n\n    This plan will increase replication by adding \#{change} replica(s) to each partition.\n  SUMMARY\nend\n"