Module: Karafka::Admin

Defined in:
lib/karafka/admin.rb

Overview

Note:

It always initializes a new admin instance as we want to ensure it is always closed Since admin actions are not performed that often, that should be ok.

Note:

It always uses the primary defined cluster and does not support multi-cluster work. If you need this, just replace the cluster info for the time you use this

Simple admin actions that we can perform via Karafka on our Kafka cluster

Class Method Summary collapse

Class Method Details

.cluster_infoRdkafka::Metadata

Returns cluster metadata info.

Returns:

  • (Rdkafka::Metadata)

    cluster metadata info



107
108
109
110
111
# File 'lib/karafka/admin.rb', line 107

def cluster_info
  with_admin do |admin|
    Rdkafka::Metadata.new(admin.instance_variable_get('@native_kafka'))
  end
end

.create_topic(name, partitions, replication_factor, topic_config = {}) ⇒ Object

Creates Kafka topic with given settings

Parameters:

  • name (String)

    topic name

  • partitions (Integer)

    number of partitions we expect

  • replication_factor (Integer)

    number of replicas

  • topic_config (Hash) (defaults to: {})

    topic config details as described here: kafka.apache.org/documentation/#topicconfigs



87
88
89
90
91
92
93
# File 'lib/karafka/admin.rb', line 87

def create_topic(name, partitions, replication_factor, topic_config = {})
  with_admin do |admin|
    admin.create_topic(name, partitions, replication_factor, topic_config)

    sleep(0.2) until topics_names.include?(name)
  end
end

.delete_topic(name) ⇒ Object

Deleted a given topic

Parameters:

  • name (String)

    topic name



98
99
100
101
102
103
104
# File 'lib/karafka/admin.rb', line 98

def delete_topic(name)
  with_admin do |admin|
    admin.delete_topic(name)

    sleep(0.2) while topics_names.include?(name)
  end
end

.read_topic(name, partition, count, start_offset = -1)) ⇒ Array<Karafka::Messages::Message>

Allows us to read messages from the topic

Parameters:

  • name (String, Symbol)

    topic name

  • partition (Integer)

    partition

  • count (Integer)

    how many messages we want to get at most

  • start_offset (Integer) (defaults to: -1))

    offset from which we should start. If -1 is provided (default) we will start from the latest offset

Returns:



37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
# File 'lib/karafka/admin.rb', line 37

def read_topic(name, partition, count, start_offset = -1)
  messages = []
  tpl = Rdkafka::Consumer::TopicPartitionList.new

  with_consumer do |consumer|
    offsets = consumer.query_watermark_offsets(name, partition)
    end_offset = offsets.last

    start_offset = [0, offsets.last - count].max if start_offset.negative?

    tpl.add_topic_and_partitions_with_offsets(name, partition => start_offset)
    consumer.assign(tpl)

    # We should poll as long as we don't have all the messages that we need or as long as
    # we do not read all the messages from the topic
    loop do
      # If we've got as many messages as we've wanted stop
      break if messages.size >= count
      # If we've reached end of the topic messages, don't process more
      break if !messages.empty? && end_offset <= messages.last.offset

      message = consumer.poll(200)
      messages << message if message
    rescue Rdkafka::RdkafkaError => e
      # End of partition
      break if e.code == :partition_eof

      raise e
    end
  end

  messages.map do |message|
    Messages::Builders::Message.call(
      message,
      # Use topic from routes if we can match it or create a dummy one
      # Dummy one is used in case we cannot match the topic with routes. This can happen
      # when admin API is used to read topics that are not part of the routing
      Routing::Router.find_by(name: name) || Topic.new(name, App.config.deserializer),
      Time.now
    )
  end
end