Class: Karafka::Admin::Topics

Inherits:
Karafka::Admin show all
Defined in:
lib/karafka/admin/topics.rb

Overview

Topic administration operations Provides methods to manage Kafka topics including creation, deletion, reading, and introspection

Constant Summary

Constants inherited from Karafka::Admin

Recovery

Instance Attribute Summary

Attributes inherited from Karafka::Admin

#custom_kafka

Class Method Summary collapse

Instance Method Summary collapse

Methods inherited from Karafka::Admin

#cluster_info, cluster_info, #copy_consumer_group, copy_consumer_group, #create_topic, create_topic, delete_consumer_group, #delete_consumer_group, delete_topic, #delete_topic, #initialize, #plan_topic_replication, plan_topic_replication, read_lags_with_offsets, #read_lags_with_offsets, read_topic, #read_topic, rename_consumer_group, #rename_consumer_group, seek_consumer_group, #seek_consumer_group, topic_info, #topic_info, #trigger_rebalance, trigger_rebalance, with_admin, #with_admin, with_consumer, #with_consumer

Constructor Details

This class inherits a constructor from Karafka::Admin

Class Method Details

.create(name, partitions, replication_factor, topic_config = {}) ⇒ Object

Parameters:

  • name (String)

    topic name

  • partitions (Integer)

    number of partitions for this topic

  • replication_factor (Integer)

    number of replicas

  • topic_config (Hash) (defaults to: {})

    topic config details as described in the ‘base topic configuration`))

See Also:



26
27
28
# File 'lib/karafka/admin/topics.rb', line 26

def create(name, partitions, replication_factor, topic_config = {})
  new.create(name, partitions, replication_factor, topic_config)
end

.create_partitions(name, partitions) ⇒ Object

Parameters:

  • name (String)

    topic name

  • partitions (Integer)

    total number of partitions we expect to end up with

See Also:



39
40
41
# File 'lib/karafka/admin/topics.rb', line 39

def create_partitions(name, partitions)
  new.create_partitions(name, partitions)
end

.delete(name) ⇒ Object

Parameters:

  • name (String)

    topic name

See Also:



32
33
34
# File 'lib/karafka/admin/topics.rb', line 32

def delete(name)
  new.delete(name)
end

.info(topic_name) ⇒ Object

Parameters:

  • topic_name (String)

    name of the topic we’re interested in

See Also:



52
53
54
# File 'lib/karafka/admin/topics.rb', line 52

def info(topic_name)
  new.info(topic_name)
end

.read(name, partition, count, start_offset = -1,, settings = {}) ⇒ Object

Parameters:

  • name (String, Symbol)

    topic name

  • partition (Integer)

    partition

  • count (Integer)

    how many messages we want to get at most

  • start_offset (Integer, Time) (defaults to: -1,)

    offset from which we should start

  • settings (Hash) (defaults to: {})

    kafka extra settings (optional)

See Also:



16
17
18
# File 'lib/karafka/admin/topics.rb', line 16

def read(name, partition, count, start_offset = -1, settings = {})
  new.read(name, partition, count, start_offset, settings)
end

.read_watermark_offsets(name_or_hash, partition = nil) ⇒ Object

Parameters:

  • name_or_hash (String, Symbol, Hash)

    topic name or hash with topics and partitions

  • partition (Integer, nil) (defaults to: nil)

    partition (nil when using hash format)

See Also:



46
47
48
# File 'lib/karafka/admin/topics.rb', line 46

def read_watermark_offsets(name_or_hash, partition = nil)
  new.read_watermark_offsets(name_or_hash, partition)
end

Instance Method Details

#create(name, partitions, replication_factor, topic_config = {}) ⇒ void

This method returns an undefined value.

Creates Kafka topic with given settings

Parameters:

  • name (String)

    topic name

  • partitions (Integer)

    number of partitions we expect

  • replication_factor (Integer)

    number of replicas

  • topic_config (Hash) (defaults to: {})

    topic config details as described here: kafka.apache.org/documentation/#topicconfigs



145
146
147
148
149
150
151
152
153
154
# File 'lib/karafka/admin/topics.rb', line 145

def create(name, partitions, replication_factor, topic_config = {})
  with_admin do |admin|
    handler = admin.create_topic(name, partitions, replication_factor, topic_config)

    with_re_wait(
      -> { handler.wait(max_wait_timeout_ms: max_wait_time_ms) },
      -> { names.include?(name) }
    )
  end
end

#create_partitions(name, partitions) ⇒ void

This method returns an undefined value.

Creates more partitions for a given topic

Parameters:

  • name (String)

    topic name

  • partitions (Integer)

    total number of partitions we expect to end up with



178
179
180
181
182
183
184
185
186
187
# File 'lib/karafka/admin/topics.rb', line 178

def create_partitions(name, partitions)
  with_admin do |admin|
    handler = admin.create_partitions(name, partitions)

    with_re_wait(
      -> { handler.wait(max_wait_timeout_ms: max_wait_time_ms) },
      -> { info(name).fetch(:partition_count) >= partitions }
    )
  end
end

#delete(name) ⇒ void

This method returns an undefined value.

Deleted a given topic

Parameters:

  • name (String)

    topic name



161
162
163
164
165
166
167
168
169
170
# File 'lib/karafka/admin/topics.rb', line 161

def delete(name)
  with_admin do |admin|
    handler = admin.delete_topic(name)

    with_re_wait(
      -> { handler.wait(max_wait_timeout_ms: max_wait_time_ms) },
      -> { !names.include?(name) }
    )
  end
end

#info(topic_name) ⇒ Hash

Note:

This query is much more efficient than doing a full #cluster_info + topic lookup because it does not have to query for all the topics data but just the topic we’re interested in

Returns basic topic metadata

Parameters:

  • topic_name (String)

    name of the topic we’re interested in

Returns:

  • (Hash)

    topic metadata info hash

Raises:

  • (Rdkafka::RdkafkaError)

    unknown_topic_or_part if requested topic is not found



243
244
245
246
247
248
249
250
# File 'lib/karafka/admin/topics.rb', line 243

def info(topic_name)
  with_admin do |admin|
    admin
      .(topic_name)
      .topics
      .find { |topic| topic[:topic_name] == topic_name }
  end
end

#read(name, partition, count, start_offset = -1,, settings = {}) ⇒ Array<Karafka::Messages::Message>

Allows us to read messages from the topic

Parameters:

  • name (String, Symbol)

    topic name

  • partition (Integer)

    partition

  • count (Integer)

    how many messages we want to get at most

  • start_offset (Integer, Time) (defaults to: -1,)

    offset from which we should start. If -1 is provided (default) we will start from the latest offset. If time is provided, the appropriate offset will be resolved. If negative beyond -1 is provided, we move backwards more.

  • settings (Hash) (defaults to: {})

    kafka extra settings (optional)

Returns:



68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
# File 'lib/karafka/admin/topics.rb', line 68

def read(name, partition, count, start_offset = -1, settings = {})
  messages = []
  tpl = Rdkafka::Consumer::TopicPartitionList.new
  low_offset, high_offset = nil

  with_consumer(settings) do |consumer|
    # Convert the time offset (if needed)
    start_offset = resolve_offset(consumer, name.to_s, partition, start_offset)

    low_offset, high_offset = consumer.query_watermark_offsets(name, partition)

    # Select offset dynamically if -1 or less and move backwards with the negative
    # offset, allowing to start from N messages back from high-watermark
    start_offset = high_offset - count - start_offset.abs + 1 if start_offset.negative?
    start_offset = low_offset if start_offset.negative?

    # Build the requested range - since first element is on the start offset we need to
    # subtract one from requested count to end up with expected number of elements
    requested_range = (start_offset..(start_offset + count - 1))
    # Establish theoretical available range. Note, that this does not handle cases related
    # to log retention or compaction
    available_range = (low_offset..(high_offset - 1))
    # Select only offset that we can select. This will remove all the potential offsets
    # that are below the low watermark offset
    possible_range = requested_range.select { |offset| available_range.include?(offset) }

    start_offset = possible_range.first
    count = possible_range.size

    tpl.add_topic_and_partitions_with_offsets(name, partition => start_offset)
    consumer.assign(tpl)

    # We should poll as long as we don't have all the messages that we need or as long as
    # we do not read all the messages from the topic
    loop do
      # If we've got as many messages as we've wanted stop
      break if messages.size >= count

      message = consumer.poll(200)

      next unless message

      # If the message we've got is beyond the requested range, stop
      break unless possible_range.include?(message.offset)

      messages << message
    rescue Rdkafka::RdkafkaError => e
      # End of partition
      break if e.code == :partition_eof

      raise e
    end
  end

  # Use topic from routes if we can match it or create a dummy one
  # Dummy one is used in case we cannot match the topic with routes. This can happen
  # when admin API is used to read topics that are not part of the routing
  topic = Karafka::Routing::Router.find_or_initialize_by_name(name)

  messages.map! do |message|
    Messages::Builders::Message.call(
      message,
      topic,
      Time.now
    )
  end
end

#read_watermark_offsets(name_or_hash, partition = nil) ⇒ Array<Integer, Integer>, Hash

Fetches the watermark offsets for a given topic partition or multiple topics and partitions

Examples:

Query single partition

Karafka::Admin::Topics.read_watermark_offsets('events', 0)
# => [0, 100]

Query specific partitions across multiple topics

Karafka::Admin::Topics.read_watermark_offsets(
  { 'events' => [0, 1], 'logs' => [0] }
)
# => {
#   'events' => {
#     0 => [0, 100],
#     1 => [0, 150]
#   },
#   'logs' => {
#     0 => [0, 50]
#   }
# }

Parameters:

  • name_or_hash (String, Symbol, Hash)

    topic name or hash with topics and partitions

  • partition (Integer, nil) (defaults to: nil)

    partition number (required when first param is topic name)

Returns:

  • (Array<Integer, Integer>, Hash)

    when querying single partition returns array with low and high watermark offsets, when querying multiple returns nested hash



216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
# File 'lib/karafka/admin/topics.rb', line 216

def read_watermark_offsets(name_or_hash, partition = nil)
  # Normalize input to hash format
  topics_with_partitions = partition ? { name_or_hash => [partition] } : name_or_hash

  result = Hash.new { |h, k| h[k] = {} }

  with_consumer do |consumer|
    topics_with_partitions.each do |topic, partitions|
      partitions.each do |partition_id|
        result[topic][partition_id] = consumer.query_watermark_offsets(topic, partition_id)
      end
    end
  end

  # Return single array for single partition query, hash for multiple
  partition ? result.dig(name_or_hash, partition) : result
end