Module: Karafka::Admin

Defined in:
lib/karafka/admin.rb

Overview

Note:

It always initializes a new admin instance as we want to ensure it is always closed Since admin actions are not performed that often, that should be ok.

Note:

It always uses the primary defined cluster and does not support multi-cluster work. Cluster on which operations are performed can be changed via ‘admin.kafka` config, however there is no multi-cluster runtime support.

Simple admin actions that we can perform via Karafka on our Kafka cluster

Class Method Summary collapse

Class Method Details

.cluster_infoRdkafka::Metadata

Returns cluster metadata info.

Returns:

  • (Rdkafka::Metadata)

    cluster metadata info



155
156
157
158
159
160
161
# File 'lib/karafka/admin.rb', line 155

def cluster_info
  with_admin do |admin|
    admin.instance_variable_get('@native_kafka').with_inner do |inner|
      Rdkafka::Metadata.new(inner)
    end
  end
end

.create_partitions(name, partitions) ⇒ Object

Creates more partitions for a given topic

Parameters:

  • name (String)

    topic name

  • partitions (Integer)

    total number of partitions we expect to end up with



127
128
129
130
131
132
133
134
135
136
# File 'lib/karafka/admin.rb', line 127

def create_partitions(name, partitions)
  with_admin do |admin|
    handler = admin.create_partitions(name, partitions)

    with_re_wait(
      -> { handler.wait(max_wait_timeout: app_config.admin.max_wait_time) },
      -> { topic(name).fetch(:partition_count) >= partitions }
    )
  end
end

.create_topic(name, partitions, replication_factor, topic_config = {}) ⇒ Object

Creates Kafka topic with given settings

Parameters:

  • name (String)

    topic name

  • partitions (Integer)

    number of partitions we expect

  • replication_factor (Integer)

    number of replicas

  • topic_config (Hash) (defaults to: {})

    topic config details as described here: kafka.apache.org/documentation/#topicconfigs



98
99
100
101
102
103
104
105
106
107
# File 'lib/karafka/admin.rb', line 98

def create_topic(name, partitions, replication_factor, topic_config = {})
  with_admin do |admin|
    handler = admin.create_topic(name, partitions, replication_factor, topic_config)

    with_re_wait(
      -> { handler.wait(max_wait_timeout: app_config.admin.max_wait_time) },
      -> { topics_names.include?(name) }
    )
  end
end

.delete_topic(name) ⇒ Object

Deleted a given topic

Parameters:

  • name (String)

    topic name



112
113
114
115
116
117
118
119
120
121
# File 'lib/karafka/admin.rb', line 112

def delete_topic(name)
  with_admin do |admin|
    handler = admin.delete_topic(name)

    with_re_wait(
      -> { handler.wait(max_wait_timeout: app_config.admin.max_wait_time) },
      -> { !topics_names.include?(name) }
    )
  end
end

.read_topic(name, partition, count, start_offset = -1,, settings = {}) ⇒ Array<Karafka::Messages::Message>

Allows us to read messages from the topic

Parameters:

  • name (String, Symbol)

    topic name

  • partition (Integer)

    partition

  • count (Integer)

    how many messages we want to get at most

  • start_offset (Integer, Time) (defaults to: -1,)

    offset from which we should start. If -1 is provided (default) we will start from the latest offset. If time is provided, the appropriate offset will be resolved.

  • settings (Hash) (defaults to: {})

    kafka extra settings (optional)

Returns:



25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
# File 'lib/karafka/admin.rb', line 25

def read_topic(name, partition, count, start_offset = -1, settings = {})
  messages = []
  tpl = Rdkafka::Consumer::TopicPartitionList.new
  low_offset, high_offset = nil

  with_consumer(settings) do |consumer|
    # Convert the time offset (if needed)
    start_offset = resolve_offset(consumer, name.to_s, partition, start_offset)

    low_offset, high_offset = consumer.query_watermark_offsets(name, partition)

    # Select offset dynamically if -1 or less
    start_offset = high_offset - count if start_offset.negative?

    # Build the requested range - since first element is on the start offset we need to
    # subtract one from requested count to end up with expected number of elements
    requested_range = (start_offset..start_offset + (count - 1))
    # Establish theoretical available range. Note, that this does not handle cases related to
    # log retention or compaction
    available_range = (low_offset..(high_offset - 1))
    # Select only offset that we can select. This will remove all the potential offsets that
    # are below the low watermark offset
    possible_range = requested_range.select { |offset| available_range.include?(offset) }

    start_offset = possible_range.first
    count = possible_range.count

    tpl.add_topic_and_partitions_with_offsets(name, partition => start_offset)
    consumer.assign(tpl)

    # We should poll as long as we don't have all the messages that we need or as long as
    # we do not read all the messages from the topic
    loop do
      # If we've got as many messages as we've wanted stop
      break if messages.size >= count

      message = consumer.poll(200)

      next unless message

      # If the message we've got is beyond the requested range, stop
      break unless possible_range.include?(message.offset)

      messages << message
    rescue Rdkafka::RdkafkaError => e
      # End of partition
      break if e.code == :partition_eof

      raise e
    end
  end

  # Use topic from routes if we can match it or create a dummy one
  # Dummy one is used in case we cannot match the topic with routes. This can happen
  # when admin API is used to read topics that are not part of the routing
  topic = ::Karafka::Routing::Router.find_or_initialize_by_name(name)

  messages.map! do |message|
    Messages::Builders::Message.call(
      message,
      topic,
      Time.now
    )
  end
end

.read_watermark_offsets(name, partition) ⇒ Array<Integer, Integer>

Fetches the watermark offsets for a given topic partition

Parameters:

  • name (String, Symbol)

    topic name

  • partition (Integer)

    partition

Returns:

  • (Array<Integer, Integer>)

    low watermark offset and high watermark offset



143
144
145
146
147
148
149
150
151
152
# File 'lib/karafka/admin.rb', line 143

def read_watermark_offsets(name, partition)
  with_consumer do |consumer|
    # For newly created topics or in cases where we're trying to get them but there is no
    # leader, this can fail. It happens more often for new topics under KRaft, however we
    # still want to make sure things operate as expected even then
    with_rdkafka_retry(codes: %i[not_leader_for_partition]) do
      consumer.query_watermark_offsets(name, partition)
    end
  end
end

.with_consumer(settings = {}) ⇒ Object

Note:

We always ship and yield a proxied consumer because admin API performance is not that relevant. That is, there are no high frequency calls that would have to be delegated

Creates consumer instance and yields it. After usage it closes the consumer instance This API can be used in other pieces of code and allows for low-level consumer usage

Parameters:

  • settings (Hash) (defaults to: {})

    extra settings to customize consumer



170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
# File 'lib/karafka/admin.rb', line 170

def with_consumer(settings = {})
  consumer = config(:consumer, settings).consumer
  proxy = ::Karafka::Connection::Proxy.new(consumer)
  yield(proxy)
ensure
  # Always unsubscribe consumer just to be sure, that no metadata requests are running
  # when we close the consumer. This in theory should prevent from some race-conditions
  # that originate from librdkafka
  begin
    consumer&.unsubscribe
  # Ignore any errors and continue to close consumer despite them
  rescue Rdkafka::RdkafkaError
    nil
  end

  consumer&.close
end