Class: Rdkafka::Admin

Inherits:
Object
  • Object
show all
Defined in:
lib/rdkafka/admin.rb,
lib/rdkafka/admin/create_topic_handle.rb,
lib/rdkafka/admin/create_topic_report.rb,
lib/rdkafka/admin/delete_topic_handle.rb,
lib/rdkafka/admin/delete_topic_report.rb

Defined Under Namespace

Classes: CreateTopicHandle, CreateTopicReport, DeleteTopicHandle, DeleteTopicReport

Instance Method Summary collapse

Instance Method Details

#closeObject

Close this admin instance



20
21
22
23
24
# File 'lib/rdkafka/admin.rb', line 20

def close
  return if closed?
  ObjectSpace.undefine_finalizer(self)
  @native_kafka.close
end

#closed?Boolean

Whether this admin has closed

Returns:

  • (Boolean)


27
28
29
# File 'lib/rdkafka/admin.rb', line 27

def closed?
  @native_kafka.closed?
end

#create_topic(topic_name, partition_count, replication_factor, topic_config = {}) ⇒ CreateTopicHandle

Create a topic with the given partition count and replication factor

Returns:

  • (CreateTopicHandle)

    Create topic handle that can be used to wait for the result of creating the topic

Raises:

  • (ConfigError)

    When the partition count or replication factor are out of valid range

  • (RdkafkaError)

    When the topic name is invalid or the topic already exists

  • (RdkafkaError)

    When the topic configuration is invalid



38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
# File 'lib/rdkafka/admin.rb', line 38

def create_topic(topic_name, partition_count, replication_factor, topic_config={})
  closed_admin_check(__method__)

  # Create a rd_kafka_NewTopic_t representing the new topic
  error_buffer = FFI::MemoryPointer.from_string(" " * 256)
  new_topic_ptr = Rdkafka::Bindings.rd_kafka_NewTopic_new(
    FFI::MemoryPointer.from_string(topic_name),
    partition_count,
    replication_factor,
    error_buffer,
    256
  )
  if new_topic_ptr.null?
    raise Rdkafka::Config::ConfigError.new(error_buffer.read_string)
  end

  unless topic_config.nil?
    topic_config.each do |key, value|
      Rdkafka::Bindings.rd_kafka_NewTopic_set_config(
        new_topic_ptr,
        key.to_s,
        value.to_s
      )
    end
  end

  # Note that rd_kafka_CreateTopics can create more than one topic at a time
  pointer_array = [new_topic_ptr]
  topics_array_ptr = FFI::MemoryPointer.new(:pointer)
  topics_array_ptr.write_array_of_pointer(pointer_array)

  # Get a pointer to the queue that our request will be enqueued on
  queue_ptr = @native_kafka.with_inner do |inner|
    Rdkafka::Bindings.rd_kafka_queue_get_background(inner)
  end
  if queue_ptr.null?
    Rdkafka::Bindings.rd_kafka_NewTopic_destroy(new_topic_ptr)
    raise Rdkafka::Config::ConfigError.new("rd_kafka_queue_get_background was NULL")
  end

  # Create and register the handle we will return to the caller
  create_topic_handle = CreateTopicHandle.new
  create_topic_handle[:pending] = true
  create_topic_handle[:response] = -1
  CreateTopicHandle.register(create_topic_handle)
  admin_options_ptr = @native_kafka.with_inner do |inner|
    Rdkafka::Bindings.rd_kafka_AdminOptions_new(inner, Rdkafka::Bindings::RD_KAFKA_ADMIN_OP_CREATETOPICS)
  end
  Rdkafka::Bindings.rd_kafka_AdminOptions_set_opaque(admin_options_ptr, create_topic_handle.to_ptr)

  begin
    @native_kafka.with_inner do |inner|
      Rdkafka::Bindings.rd_kafka_CreateTopics(
        inner,
        topics_array_ptr,
        1,
        admin_options_ptr,
        queue_ptr
      )
    end
  rescue Exception
    CreateTopicHandle.remove(create_topic_handle.to_ptr.address)
    raise
  ensure
    Rdkafka::Bindings.rd_kafka_AdminOptions_destroy(admin_options_ptr)
    Rdkafka::Bindings.rd_kafka_queue_destroy(queue_ptr)
    Rdkafka::Bindings.rd_kafka_NewTopic_destroy(new_topic_ptr)
  end

  create_topic_handle
end

#delete_topic(topic_name) ⇒ DeleteTopicHandle

Delete the named topic

Returns:

  • (DeleteTopicHandle)

    Delete topic handle that can be used to wait for the result of deleting the topic

Raises:

  • (RdkafkaError)

    When the topic name is invalid or the topic does not exist



115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
# File 'lib/rdkafka/admin.rb', line 115

def delete_topic(topic_name)
  closed_admin_check(__method__)

  # Create a rd_kafka_DeleteTopic_t representing the topic to be deleted
  delete_topic_ptr = Rdkafka::Bindings.rd_kafka_DeleteTopic_new(FFI::MemoryPointer.from_string(topic_name))

  # Note that rd_kafka_DeleteTopics can create more than one topic at a time
  pointer_array = [delete_topic_ptr]
  topics_array_ptr = FFI::MemoryPointer.new(:pointer)
  topics_array_ptr.write_array_of_pointer(pointer_array)

  # Get a pointer to the queue that our request will be enqueued on
  queue_ptr = @native_kafka.with_inner do |inner|
    Rdkafka::Bindings.rd_kafka_queue_get_background(inner)
  end
  if queue_ptr.null?
    Rdkafka::Bindings.rd_kafka_DeleteTopic_destroy(delete_topic_ptr)
    raise Rdkafka::Config::ConfigError.new("rd_kafka_queue_get_background was NULL")
  end

  # Create and register the handle we will return to the caller
  delete_topic_handle = DeleteTopicHandle.new
  delete_topic_handle[:pending] = true
  delete_topic_handle[:response] = -1
  DeleteTopicHandle.register(delete_topic_handle)
  admin_options_ptr = @native_kafka.with_inner do |inner|
    Rdkafka::Bindings.rd_kafka_AdminOptions_new(inner, Rdkafka::Bindings::RD_KAFKA_ADMIN_OP_DELETETOPICS)
  end
  Rdkafka::Bindings.rd_kafka_AdminOptions_set_opaque(admin_options_ptr, delete_topic_handle.to_ptr)

  begin
    @native_kafka.with_inner do |inner|
      Rdkafka::Bindings.rd_kafka_DeleteTopics(
        inner,
        topics_array_ptr,
        1,
        admin_options_ptr,
        queue_ptr
      )
    end
  rescue Exception
    DeleteTopicHandle.remove(delete_topic_handle.to_ptr.address)
    raise
  ensure
    Rdkafka::Bindings.rd_kafka_AdminOptions_destroy(admin_options_ptr)
    Rdkafka::Bindings.rd_kafka_queue_destroy(queue_ptr)
    Rdkafka::Bindings.rd_kafka_DeleteTopic_destroy(delete_topic_ptr)
  end

  delete_topic_handle
end

#finalizerObject



15
16
17
# File 'lib/rdkafka/admin.rb', line 15

def finalizer
  ->(_) { close }
end