Class: Karafka::Admin::Topics
- Inherits:
-
Karafka::Admin
- Object
- Karafka::Admin
- Karafka::Admin::Topics
- Defined in:
- lib/karafka/admin/topics.rb
Overview
Topic administration operations Provides methods to manage Kafka topics including creation, deletion, reading, and introspection
Constant Summary
Constants inherited from Karafka::Admin
Instance Attribute Summary
Attributes inherited from Karafka::Admin
Class Method Summary collapse
- .create(name, partitions, replication_factor, topic_config = {}) ⇒ Object
- .create_partitions(name, partitions) ⇒ Object
- .delete(name) ⇒ Object
- .info(topic_name) ⇒ Object
- .read(name, partition, count, start_offset = -1,, settings = {}) ⇒ Object
- .read_watermark_offsets(name_or_hash, partition = nil) ⇒ Object
Instance Method Summary collapse
-
#create(name, partitions, replication_factor, topic_config = {}) ⇒ void
Creates Kafka topic with given settings.
-
#create_partitions(name, partitions) ⇒ void
Creates more partitions for a given topic.
-
#delete(name) ⇒ void
Deleted a given topic.
-
#info(topic_name) ⇒ Hash
Returns basic topic metadata.
-
#read(name, partition, count, start_offset = -1,, settings = {}) ⇒ Array<Karafka::Messages::Message>
Allows us to read messages from the topic.
-
#read_watermark_offsets(name_or_hash, partition = nil) ⇒ Array<Integer, Integer>, Hash
Fetches the watermark offsets for a given topic partition or multiple topics and partitions.
Methods inherited from Karafka::Admin
#cluster_info, cluster_info, #copy_consumer_group, copy_consumer_group, #create_topic, create_topic, delete_consumer_group, #delete_consumer_group, delete_topic, #delete_topic, #initialize, #plan_topic_replication, plan_topic_replication, read_lags_with_offsets, #read_lags_with_offsets, read_topic, #read_topic, rename_consumer_group, #rename_consumer_group, seek_consumer_group, #seek_consumer_group, topic_info, #topic_info, #trigger_rebalance, trigger_rebalance, with_admin, #with_admin, with_consumer, #with_consumer
Constructor Details
This class inherits a constructor from Karafka::Admin
Class Method Details
.create(name, partitions, replication_factor, topic_config = {}) ⇒ Object
26 27 28 |
# File 'lib/karafka/admin/topics.rb', line 26 def create(name, partitions, replication_factor, topic_config = {}) new.create(name, partitions, replication_factor, topic_config) end |
.create_partitions(name, partitions) ⇒ Object
39 40 41 |
# File 'lib/karafka/admin/topics.rb', line 39 def create_partitions(name, partitions) new.create_partitions(name, partitions) end |
.delete(name) ⇒ Object
32 33 34 |
# File 'lib/karafka/admin/topics.rb', line 32 def delete(name) new.delete(name) end |
.info(topic_name) ⇒ Object
52 53 54 |
# File 'lib/karafka/admin/topics.rb', line 52 def info(topic_name) new.info(topic_name) end |
.read(name, partition, count, start_offset = -1,, settings = {}) ⇒ Object
16 17 18 |
# File 'lib/karafka/admin/topics.rb', line 16 def read(name, partition, count, start_offset = -1, settings = {}) new.read(name, partition, count, start_offset, settings) end |
.read_watermark_offsets(name_or_hash, partition = nil) ⇒ Object
46 47 48 |
# File 'lib/karafka/admin/topics.rb', line 46 def read_watermark_offsets(name_or_hash, partition = nil) new.read_watermark_offsets(name_or_hash, partition) end |
Instance Method Details
#create(name, partitions, replication_factor, topic_config = {}) ⇒ void
This method returns an undefined value.
Creates Kafka topic with given settings
145 146 147 148 149 150 151 152 153 154 |
# File 'lib/karafka/admin/topics.rb', line 145 def create(name, partitions, replication_factor, topic_config = {}) with_admin do |admin| handler = admin.create_topic(name, partitions, replication_factor, topic_config) with_re_wait( -> { handler.wait(max_wait_timeout_ms: max_wait_time_ms) }, -> { names.include?(name) } ) end end |
#create_partitions(name, partitions) ⇒ void
This method returns an undefined value.
Creates more partitions for a given topic
178 179 180 181 182 183 184 185 186 187 |
# File 'lib/karafka/admin/topics.rb', line 178 def create_partitions(name, partitions) with_admin do |admin| handler = admin.create_partitions(name, partitions) with_re_wait( -> { handler.wait(max_wait_timeout_ms: max_wait_time_ms) }, -> { info(name).fetch(:partition_count) >= partitions } ) end end |
#delete(name) ⇒ void
This method returns an undefined value.
Deleted a given topic
161 162 163 164 165 166 167 168 169 170 |
# File 'lib/karafka/admin/topics.rb', line 161 def delete(name) with_admin do |admin| handler = admin.delete_topic(name) with_re_wait( -> { handler.wait(max_wait_timeout_ms: max_wait_time_ms) }, -> { !names.include?(name) } ) end end |
#info(topic_name) ⇒ Hash
This query is much more efficient than doing a full #cluster_info + topic lookup because it does not have to query for all the topics data but just the topic we’re interested in
Returns basic topic metadata
243 244 245 246 247 248 249 250 |
# File 'lib/karafka/admin/topics.rb', line 243 def info(topic_name) with_admin do |admin| admin .(topic_name) .topics .find { |topic| topic[:topic_name] == topic_name } end end |
#read(name, partition, count, start_offset = -1,, settings = {}) ⇒ Array<Karafka::Messages::Message>
Allows us to read messages from the topic
68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 |
# File 'lib/karafka/admin/topics.rb', line 68 def read(name, partition, count, start_offset = -1, settings = {}) = [] tpl = Rdkafka::Consumer::TopicPartitionList.new low_offset, high_offset = nil with_consumer(settings) do |consumer| # Convert the time offset (if needed) start_offset = resolve_offset(consumer, name.to_s, partition, start_offset) low_offset, high_offset = consumer.query_watermark_offsets(name, partition) # Select offset dynamically if -1 or less and move backwards with the negative # offset, allowing to start from N messages back from high-watermark start_offset = high_offset - count - start_offset.abs + 1 if start_offset.negative? start_offset = low_offset if start_offset.negative? # Build the requested range - since first element is on the start offset we need to # subtract one from requested count to end up with expected number of elements requested_range = (start_offset..(start_offset + count - 1)) # Establish theoretical available range. Note, that this does not handle cases related # to log retention or compaction available_range = (low_offset..(high_offset - 1)) # Select only offset that we can select. This will remove all the potential offsets # that are below the low watermark offset possible_range = requested_range.select { |offset| available_range.include?(offset) } start_offset = possible_range.first count = possible_range.size tpl.add_topic_and_partitions_with_offsets(name, partition => start_offset) consumer.assign(tpl) # We should poll as long as we don't have all the messages that we need or as long as # we do not read all the messages from the topic loop do # If we've got as many messages as we've wanted stop break if .size >= count = consumer.poll(200) next unless # If the message we've got is beyond the requested range, stop break unless possible_range.include?(.offset) << rescue Rdkafka::RdkafkaError => e # End of partition break if e.code == :partition_eof raise e end end # Use topic from routes if we can match it or create a dummy one # Dummy one is used in case we cannot match the topic with routes. This can happen # when admin API is used to read topics that are not part of the routing topic = Karafka::Routing::Router.find_or_initialize_by_name(name) .map! do || Messages::Builders::Message.call( , topic, Time.now ) end end |
#read_watermark_offsets(name_or_hash, partition = nil) ⇒ Array<Integer, Integer>, Hash
Fetches the watermark offsets for a given topic partition or multiple topics and partitions
216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 |
# File 'lib/karafka/admin/topics.rb', line 216 def read_watermark_offsets(name_or_hash, partition = nil) # Normalize input to hash format topics_with_partitions = partition ? { name_or_hash => [partition] } : name_or_hash result = Hash.new { |h, k| h[k] = {} } with_consumer do |consumer| topics_with_partitions.each do |topic, partitions| partitions.each do |partition_id| result[topic][partition_id] = consumer.query_watermark_offsets(topic, partition_id) end end end # Return single array for single partition query, hash for multiple partition ? result.dig(name_or_hash, partition) : result end |