Class: Kafka::FFI::TopicConfig

Inherits:
OpaquePointer show all
Defined in:
lib/kafka/ffi/topic_config.rb

Overview

TopicConfig can be passed to Topic.new to configure how the client interacts with the Topic.

Instance Attribute Summary

Attributes inherited from OpaquePointer

#pointer

Class Method Summary collapse

Instance Method Summary collapse

Methods inherited from OpaquePointer

by_ref, from_native, inherited, #initialize, to_native

Constructor Details

This class inherits a constructor from Kafka::FFI::OpaquePointer

Class Method Details

.newObject



10
11
12
# File 'lib/kafka/ffi/topic_config.rb', line 10

def self.new
  Kafka::FFI.rd_kafka_topic_conf_new
end

Instance Method Details

#destroyObject

Note:

Never call #destroy on a Config that has been passed to Kafka::FFI.rd_kafka_topic_new since the handle will take ownership of the config.

Free all resources used by the topic config.



120
121
122
123
124
# File 'lib/kafka/ffi/topic_config.rb', line 120

def destroy
  if !pointer.null?
    ::Kafka::FFI.rd_kafka_topic_conf_destroy(self)
  end
end

#dupTopicConfig

Duplicate the current config

Returns:



80
81
82
# File 'lib/kafka/ffi/topic_config.rb', line 80

def dup
  ::Kafka::FFI.rd_kafka_topic_conf_dup(self)
end

#get(key) ⇒ String, :unknown

Get the current config value for the given key.

Parameters:

  • key (String)

    Config key to fetch the setting for.

Returns:

  • (String, :unknown)

    Value for the key or :unknown if not already set.



50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
# File 'lib/kafka/ffi/topic_config.rb', line 50

def get(key)
  key = key.to_s

  # Will contain the size of the value at key
  size = ::FFI::MemoryPointer.new(:size_t)

  # Make an initial request for the size of buffer we need to allocate.
  # When trying to make a guess at the potential size the code would often
  # segfault due to rd_kafka_conf_get reallocating the buffer.
  err = ::Kafka::FFI.rd_kafka_topic_conf_get(self, key, ::FFI::Pointer::NULL, size)
  if err != :ok
    return err
  end

  # Allocate a string long enough to contain the whole value.
  value = ::FFI::MemoryPointer.new(:char, size.read(:size_t))
  err = ::Kafka::FFI.rd_kafka_topic_conf_get(self, key, value, size)
  if err != :ok
    return err
  end

  value.read_string
ensure
  size.free if size
  value.free if value
end

#set(key, value) ⇒ Object

Set the config option at ‘key` to `value`. The configuration options match those used by librdkafka (and the Java client).

Parameters:

  • key (String)

    Configuration key

  • value (String)

    Value to set

Raises:

See Also:



24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
# File 'lib/kafka/ffi/topic_config.rb', line 24

def set(key, value)
  key = key.to_s
  value = value.to_s

  error = ::FFI::MemoryPointer.new(:char, 512)
  result = ::Kafka::FFI.rd_kafka_topic_conf_set(self, key, value, error, error.size)

  # See config_result enum in ffi.rb
  case result
  when :ok
    nil
  when :unknown
    raise Kafka::FFI::UnknownConfigKey.new(key, value, error.read_string)
  when :invalid
    raise Kafka::FFI::InvalidConfigValue.new(key, value, error.read_string)
  end
ensure
  error.free if error
end

#set_partitioner_cb {|topic, key, partition_count| ... } ⇒ Object

Sets a custom partitioner callback that is called for each message to determine which partition to publish the message to.

Examples:

Random partitioner

set_partitioner_cb do |_topic, _key, parts|
  rand(parts)
end

Yields:

  • (topic, key, partition_count)

Yield Parameters:

  • topic (Topic)

    Topic the message is being published to

  • key (String)

    Partitioning key provided when publishing

  • partition_count (Integer)

    Number of partitions the topic has

Yield Returns:

  • (Integer)

    The partition to publish the message to

See Also:

  • config option for predefined strategies


99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
# File 'lib/kafka/ffi/topic_config.rb', line 99

def set_partitioner_cb
  if !block_given?
    raise ArgumentError, "set_partitioner_cb must be called with a block"
  end

  # @todo How do we guarantee the block does not get garbage collected?
  # @todo Support opaque pointers?
  cb = ::FFI::Function.new(:int, [:pointer, :string, :size_t, :int32, :pointer, :pointer]) do |topic, key, _, partitions, _, _|
    topic = Topic.new(topic)

    yield(topic, key, partitions)
  end

  ::Kafka::FFI.rd_kafka_topic_conf_set_partitioner_cb(self, cb)
end