Class: Kafka::Admin

Inherits:
Object
  • Object
show all
Defined in:
lib/kafka/admin.rb

Overview

Admin provides a client for accessing the rdkafka Admin API to make changes to the cluster. The API provides was to create topics, delete topics, add new partitions for a topic, and manage configs

Instance Method Summary collapse

Constructor Details

#initialize(config = nil) ⇒ Admin

Create a new Admin client for accessing the librdkafka Admin API.

Parameters:



14
15
16
17
# File 'lib/kafka/admin.rb', line 14

def initialize(config = nil)
  # Wrap a Producer since it appears to allocate the fewest resources.
  @client = ::Kafka::FFI::Producer.new(config)
end

Instance Method Details

#create_topic(name, partitions, replication_factor, wait: true, validate: false, timeout: nil) ⇒ nil, TopicResult

Create a topic with the given name, number of partitions, and number of replicas per partition (replication factor). Total number of partitions will be partitions x replication_factor.

Parameters:

  • name (String)

    Name of the topic to create

  • partitions (Integer)

    Number of partitions the topic will have

  • replication_factor (Integer)

    Number of replicas per partition to have in the cluster.

  • wait (Boolean) (defaults to: true)

    Wait up to timeout milliseconds for topic creation to propogate to the cluster before returning.

  • validate (Boolean) (defaults to: false)

    Only validate the request

  • timeout (Integer) (defaults to: nil)

    Time to wait in milliseconds for each operation to complete. Total request execution time may be longer than timeout due to multiple operations being done. Defaults to ‘socket.timeout.ms` config setting.

Returns:

  • (nil)

    Create timed out

  • (TopicResult)

    Response from the cluster with details about if the topic was created or any errors.



39
40
41
42
43
44
45
46
47
48
49
50
# File 'lib/kafka/admin.rb', line 39

def create_topic(name, partitions, replication_factor, wait: true, validate: false, timeout: nil)
  req = ::Kafka::FFI::Admin::NewTopic.new(name, partitions, replication_factor)
  opts = new_options(:create_topics, wait: wait, validate: validate, timeout: timeout)

  res = @client.create_topics(req, options: opts)
  if res
    res[0]
  end
ensure
  opts.destroy
  req.destroy
end

#delete_topic(name, wait: true, validate: false, timeout: nil) ⇒ nil, TopicResult

Delete the topic with the given name

Parameters:

  • name (String)

    Name of the topic to delete

  • wait (Boolean) (defaults to: true)

    Wait up to timeout milliseconds for topic creation to propogate to the cluster before returning.

  • validate (Boolean) (defaults to: false)

    Only validate the request

  • timeout (Integer) (defaults to: nil)

    Time to wait in milliseconds for each operation to complete. Total request execution time may be longer than timeout due to multiple operations being done. Defaults to ‘socket.timeout.ms` config setting.

Returns:

  • (nil)

    Delete timed out

  • (TopicResult)

    Response from the cluster with details about the deletion or any errors.



67
68
69
70
71
72
73
74
75
76
77
78
# File 'lib/kafka/admin.rb', line 67

def delete_topic(name, wait: true, validate: false, timeout: nil)
  req = ::Kafka::FFI::Admin::DeleteTopic.new(name)
  opts = new_options(:create_topics, wait: wait, validate: validate, timeout: timeout)

  res = @client.delete_topics(req, options: opts)
  if res
    res[0]
  end
ensure
  opts.destroy
  req.destroy
end

#describe_config(type, name, wait: true, validate: false, timeout: nil) ⇒ ConfigResource

Get current config settings for the resource.

Examples:

Get configuration for a topic

describe_config(:topic, "events")

Parameters:

  • type (:broker, :topic, :group)

    Type of resource

  • name (String)

    Name of the resource

Returns:

  • (ConfigResource)


89
90
91
92
93
94
95
96
97
98
99
100
# File 'lib/kafka/admin.rb', line 89

def describe_config(type, name, wait: true, validate: false, timeout: nil)
  req = ::Kafka::FFI::Admin::ConfigResource.new(type, name)
  opts = new_options(:create_topics, wait: wait, validate: validate, timeout: timeout)

  res = @client.describe_configs(req, options: opts)
  if res
    res[0]
  end
ensure
  opts.destroy
  req.destroy
end

#destroyObject Also known as: close

Destroy the Client, releasing all used resources back to the system. It is the application’s responsbility to call #destroy when done with the client.



114
115
116
# File 'lib/kafka/admin.rb', line 114

def destroy
  @client.destroy
end

#metadata(local_only: false, topic: nil, timeout: 1000) ⇒ Metadata

Retrieve metadata for the cluster

Returns:

  • (Metadata)

See Also:



107
108
109
# File 'lib/kafka/admin.rb', line 107

def (local_only: false, topic: nil, timeout: 1000)
  @client.(local_only: local_only, topic: topic, timeout: timeout)
end