Class: Kafka::FFI::Admin::NewTopic

Inherits:
OpaquePointer show all
Defined in:
lib/kafka/ffi/admin/new_topic.rb

Instance Attribute Summary

Attributes inherited from OpaquePointer

#pointer

Class Method Summary collapse

Instance Method Summary collapse

Methods inherited from OpaquePointer

by_ref, from_native, inherited, #initialize, to_native

Constructor Details

This class inherits a constructor from Kafka::FFI::OpaquePointer

Class Method Details

.new(name, partitions, replication_factor) ⇒ NewTopic

Create a new NewTopic for passing to Admin::Client#create_topics. It is the application’s responsiblity to call #destroy when done with the object.

Parameters:

  • name (String)

    Name of the topic to create

  • partitions (Integer)

    Number of partitions in the topic

  • replication_factor (Integer)

    Default replication factor for the topic’s partitions.

  • replication_factor (-1)

    Value from #set_replica_assignment will be used.

Returns:

Raises:

  • (ArgumentError)

    Parameters were invalid



23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
# File 'lib/kafka/ffi/admin/new_topic.rb', line 23

def self.new(name, partitions, replication_factor)
  # Allocate memory for the error message
  error = ::FFI::MemoryPointer.new(:char, 512)

  if name.nil? || name.empty?
    raise ArgumentError, " name is required and cannot be blank"
  end

  obj = ::Kafka::FFI.rd_kafka_NewTopic_new(name, partitions, replication_factor, error, error.size)
  if obj.nil?
    raise ArgumentError, error.read_string
  end

  obj
ensure
  error.free
end

Instance Method Details

#destroyObject

Release the memory held by NewTopic back to the system. This must be called by the application when it is done with the object.



85
86
87
88
89
# File 'lib/kafka/ffi/admin/new_topic.rb', line 85

def destroy
  if !pointer.null?
    ::Kafka::FFI.rd_kafka_NewTopic_destroy(self)
  end
end

#set_config(name, value) ⇒ Object

Set the broker side topic configuration name/value pair.

Raises:



74
75
76
77
78
79
80
81
# File 'lib/kafka/ffi/admin/new_topic.rb', line 74

def set_config(name, value)
  err = ::Kafka::FFI.rd_kafka_NewTopic_set_config(self, name, value)
  if err != :ok
    raise ::Kafka::ResponseError, err
  end

  nil
end

#set_replica_assignment(partition, broker_ids) ⇒ Object

Note:

If called, must be call consecutively for each partition, starting at 0.

Note:

new must have been called with replication_factor of -1

Set the broker assignment for partition to the replica set in broker_ids.

Parameters:

  • partition (Integer)

    Partition to assign to the brokers

  • broker_ids (Integer, Array<Integer>)

    Brokers that will be assigned the partition for the topic.

Raises:



52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
# File 'lib/kafka/ffi/admin/new_topic.rb', line 52

def set_replica_assignment(partition, broker_ids)
  broker_ids = Array(broker_ids)

  brokers = ::FFI::MemoryPointer.new(:int32, broker_ids.size)
  error   = ::FFI::MemoryPointer.new(:char, 512)

  brokers.write_array_of_int32(broker_ids)

  err = ::Kafka::FFI.rd_kafka_NewTopic_set_replica_assignment(self, partition, brokers, broker_ids.size, error, error.size)
  if err != :ok
    raise ::Kafka::ResponseError, err
  end

  nil
ensure
  error.free
  brokers.free
end