Module: Karafka::Admin
- Defined in:
- lib/karafka/admin.rb
Overview
Note:
It always initializes a new admin instance as we want to ensure it is always closed Since admin actions are not performed that often, that should be ok.
Note:
It always uses the primary defined cluster and does not support multi-cluster work. If you need this, just replace the cluster info for the time you use this
Simple admin actions that we can perform via Karafka on our Kafka cluster
Class Method Summary collapse
-
.cluster_info ⇒ Rdkafka::Metadata
Cluster metadata info.
-
.create_topic(name, partitions, replication_factor, topic_config = {}) ⇒ Object
Creates Kafka topic with given settings.
-
.delete_topic(name) ⇒ Object
Deleted a given topic.
-
.read_topic(name, partition, count, start_offset = -1)) ⇒ Array<Karafka::Messages::Message>
Allows us to read messages from the topic.
Class Method Details
.cluster_info ⇒ Rdkafka::Metadata
Returns cluster metadata info.
107 108 109 110 111 |
# File 'lib/karafka/admin.rb', line 107 def cluster_info with_admin do |admin| Rdkafka::Metadata.new(admin.instance_variable_get('@native_kafka')) end end |
.create_topic(name, partitions, replication_factor, topic_config = {}) ⇒ Object
Creates Kafka topic with given settings
87 88 89 90 91 92 93 |
# File 'lib/karafka/admin.rb', line 87 def create_topic(name, partitions, replication_factor, topic_config = {}) with_admin do |admin| admin.create_topic(name, partitions, replication_factor, topic_config) sleep(0.2) until topics_names.include?(name) end end |
.delete_topic(name) ⇒ Object
Deleted a given topic
98 99 100 101 102 103 104 |
# File 'lib/karafka/admin.rb', line 98 def delete_topic(name) with_admin do |admin| admin.delete_topic(name) sleep(0.2) while topics_names.include?(name) end end |
.read_topic(name, partition, count, start_offset = -1)) ⇒ Array<Karafka::Messages::Message>
Allows us to read messages from the topic
37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 |
# File 'lib/karafka/admin.rb', line 37 def read_topic(name, partition, count, start_offset = -1) = [] tpl = Rdkafka::Consumer::TopicPartitionList.new with_consumer do |consumer| offsets = consumer.query_watermark_offsets(name, partition) end_offset = offsets.last start_offset = [0, offsets.last - count].max if start_offset.negative? tpl.add_topic_and_partitions_with_offsets(name, partition => start_offset) consumer.assign(tpl) # We should poll as long as we don't have all the messages that we need or as long as # we do not read all the messages from the topic loop do # If we've got as many messages as we've wanted stop break if .size >= count # If we've reached end of the topic messages, don't process more break if !.empty? && end_offset <= .last.offset = consumer.poll(200) << if rescue Rdkafka::RdkafkaError => e # End of partition break if e.code == :partition_eof raise e end end .map do || Messages::Builders::Message.call( , # Use topic from routes if we can match it or create a dummy one # Dummy one is used in case we cannot match the topic with routes. This can happen # when admin API is used to read topics that are not part of the routing Routing::Router.find_by(name: name) || Topic.new(name, App.config.deserializer), Time.now ) end end |