Module: Karafka::Admin
- Defined in:
- lib/karafka/admin.rb
Overview
It always initializes a new admin instance as we want to ensure it is always closed Since admin actions are not performed that often, that should be ok.
It always uses the primary defined cluster and does not support multi-cluster work. Cluster on which operations are performed can be changed via ‘admin.kafka` config, however there is no multi-cluster runtime support.
Simple admin actions that we can perform via Karafka on our Kafka cluster
Class Method Summary collapse
-
.cluster_info ⇒ Rdkafka::Metadata
Cluster metadata info.
-
.create_partitions(name, partitions) ⇒ Object
Creates more partitions for a given topic.
-
.create_topic(name, partitions, replication_factor, topic_config = {}) ⇒ Object
Creates Kafka topic with given settings.
-
.delete_topic(name) ⇒ Object
Deleted a given topic.
-
.read_topic(name, partition, count, start_offset = -1,, settings = {}) ⇒ Array<Karafka::Messages::Message>
Allows us to read messages from the topic.
-
.read_watermark_offsets(name, partition) ⇒ Array<Integer, Integer>
Fetches the watermark offsets for a given topic partition.
-
.with_consumer(settings = {}) ⇒ Object
Creates consumer instance and yields it.
Class Method Details
.cluster_info ⇒ Rdkafka::Metadata
Returns cluster metadata info.
155 156 157 158 159 160 161 |
# File 'lib/karafka/admin.rb', line 155 def cluster_info with_admin do |admin| admin.instance_variable_get('@native_kafka').with_inner do |inner| Rdkafka::Metadata.new(inner) end end end |
.create_partitions(name, partitions) ⇒ Object
Creates more partitions for a given topic
127 128 129 130 131 132 133 134 135 136 |
# File 'lib/karafka/admin.rb', line 127 def create_partitions(name, partitions) with_admin do |admin| handler = admin.create_partitions(name, partitions) with_re_wait( -> { handler.wait(max_wait_timeout: app_config.admin.max_wait_time) }, -> { topic(name).fetch(:partition_count) >= partitions } ) end end |
.create_topic(name, partitions, replication_factor, topic_config = {}) ⇒ Object
Creates Kafka topic with given settings
98 99 100 101 102 103 104 105 106 107 |
# File 'lib/karafka/admin.rb', line 98 def create_topic(name, partitions, replication_factor, topic_config = {}) with_admin do |admin| handler = admin.create_topic(name, partitions, replication_factor, topic_config) with_re_wait( -> { handler.wait(max_wait_timeout: app_config.admin.max_wait_time) }, -> { topics_names.include?(name) } ) end end |
.delete_topic(name) ⇒ Object
Deleted a given topic
112 113 114 115 116 117 118 119 120 121 |
# File 'lib/karafka/admin.rb', line 112 def delete_topic(name) with_admin do |admin| handler = admin.delete_topic(name) with_re_wait( -> { handler.wait(max_wait_timeout: app_config.admin.max_wait_time) }, -> { !topics_names.include?(name) } ) end end |
.read_topic(name, partition, count, start_offset = -1,, settings = {}) ⇒ Array<Karafka::Messages::Message>
Allows us to read messages from the topic
25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 |
# File 'lib/karafka/admin.rb', line 25 def read_topic(name, partition, count, start_offset = -1, settings = {}) = [] tpl = Rdkafka::Consumer::TopicPartitionList.new low_offset, high_offset = nil with_consumer(settings) do |consumer| # Convert the time offset (if needed) start_offset = resolve_offset(consumer, name.to_s, partition, start_offset) low_offset, high_offset = consumer.query_watermark_offsets(name, partition) # Select offset dynamically if -1 or less start_offset = high_offset - count if start_offset.negative? # Build the requested range - since first element is on the start offset we need to # subtract one from requested count to end up with expected number of elements requested_range = (start_offset..start_offset + (count - 1)) # Establish theoretical available range. Note, that this does not handle cases related to # log retention or compaction available_range = (low_offset..(high_offset - 1)) # Select only offset that we can select. This will remove all the potential offsets that # are below the low watermark offset possible_range = requested_range.select { |offset| available_range.include?(offset) } start_offset = possible_range.first count = possible_range.count tpl.add_topic_and_partitions_with_offsets(name, partition => start_offset) consumer.assign(tpl) # We should poll as long as we don't have all the messages that we need or as long as # we do not read all the messages from the topic loop do # If we've got as many messages as we've wanted stop break if .size >= count = consumer.poll(200) next unless # If the message we've got is beyond the requested range, stop break unless possible_range.include?(.offset) << rescue Rdkafka::RdkafkaError => e # End of partition break if e.code == :partition_eof raise e end end # Use topic from routes if we can match it or create a dummy one # Dummy one is used in case we cannot match the topic with routes. This can happen # when admin API is used to read topics that are not part of the routing topic = ::Karafka::Routing::Router.find_or_initialize_by_name(name) .map! do || Messages::Builders::Message.call( , topic, Time.now ) end end |
.read_watermark_offsets(name, partition) ⇒ Array<Integer, Integer>
Fetches the watermark offsets for a given topic partition
143 144 145 146 147 148 149 150 151 152 |
# File 'lib/karafka/admin.rb', line 143 def read_watermark_offsets(name, partition) with_consumer do |consumer| # For newly created topics or in cases where we're trying to get them but there is no # leader, this can fail. It happens more often for new topics under KRaft, however we # still want to make sure things operate as expected even then with_rdkafka_retry(codes: %i[not_leader_for_partition]) do consumer.query_watermark_offsets(name, partition) end end end |
.with_consumer(settings = {}) ⇒ Object
We always ship and yield a proxied consumer because admin API performance is not that relevant. That is, there are no high frequency calls that would have to be delegated
Creates consumer instance and yields it. After usage it closes the consumer instance This API can be used in other pieces of code and allows for low-level consumer usage
170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 |
# File 'lib/karafka/admin.rb', line 170 def with_consumer(settings = {}) consumer = config(:consumer, settings).consumer proxy = ::Karafka::Connection::Proxy.new(consumer) yield(proxy) ensure # Always unsubscribe consumer just to be sure, that no metadata requests are running # when we close the consumer. This in theory should prevent from some race-conditions # that originate from librdkafka begin consumer&.unsubscribe # Ignore any errors and continue to close consumer despite them rescue Rdkafka::RdkafkaError nil end consumer&.close end |