Class: Kafka::FFI::Admin::NewPartitions

Inherits:
OpaquePointer show all
Defined in:
lib/kafka/ffi/admin/new_partitions.rb

Instance Attribute Summary

Attributes inherited from OpaquePointer

#pointer

Class Method Summary collapse

Instance Method Summary collapse

Methods inherited from OpaquePointer

by_ref, from_native, inherited, #initialize, to_native

Constructor Details

This class inherits a constructor from Kafka::FFI::OpaquePointer

Class Method Details

.new(topic, partition_count) ⇒ Object

Allocates a new NewPartitions request for passing to CreatePartitions to increase the number of partitions for an existing topic.

Parameters:

  • topic (String)

    Name of the topic to adjust

  • partition_count (Integer)

    Increase the number of partitions to this value.

Raises:

  • (ArgumentError)

    Invalid topic or partition_count



15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
# File 'lib/kafka/ffi/admin/new_partitions.rb', line 15

def self.new(topic, partition_count)
  error = ::FFI::MemoryPointer.new(:char, 512)

  if topic.nil? || topic.empty?
    # Check in Ruby as nil will cause a segfault as of 1.3.0
    raise ArgumentError, "topic name is required"
  end

  req = ::Kafka::FFI.rd_kafka_NewPartitions_new(topic, partition_count, error, error.size)
  if req.nil?
    raise ArgumentError, error.read_string
  end

  req
ensure
  error.free
end

Instance Method Details

#destroyObject

Destroy and free the NewPartitions, releasing it’s resources back to the system.



72
73
74
# File 'lib/kafka/ffi/admin/new_partitions.rb', line 72

def destroy
  ::Kafka::FFI.rd_kafka_NewPartitions_destroy(self)
end

#set_replica_assignment(partition_index, broker_ids) ⇒ Object

Note:

This MUST either be called for all new partitions or not at all.

Assign the partition by index, relative to existing partition count, to be replicated on the set of brokers specified by broker_ids. If called, this method must be called consecutively for each new partition being created starting with an index of 0.

Examples:

Assigning broker assignments for two new partitions

# Topic already has 3 partitions and replication factor of 2.
request = NewPartitions("topic", 5)
request.set_replica_assignment(0, [1001, 1003])
request.set_replica_assignment(1, [1002, 1001])

Parameters:

  • partition_index (Integer)

    Index of the new partition being created.

  • broker_ids (Array<Integer>)

    Broker IDs to be assigned a replica of the topic. Number of broker ids should match the topic replication factor.

Raises:

  • (Kafka::ResponseError)

    Arguments were invalid or partition_index was not called consecutively.



54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
# File 'lib/kafka/ffi/admin/new_partitions.rb', line 54

def set_replica_assignment(partition_index, broker_ids)
  error = ::FFI::MemoryPointer.new(:char, 512)

  broker_list = ::FFI::MemoryPointer.new(:int32, broker_ids.length)
  broker_list.write_array_of_int32(broker_ids)

  resp = ::Kafka::FFI.rd_kafka_NewPartitions_set_replica_assignment(self, partition_index, broker_list, broker_ids.length, error, error.size)
  if resp != :ok
    raise ::Kafka::ResponseError.new(resp, error.read_string)
  end

  nil
ensure
  error.free
end