Class: Kafka::FFI::Config

Inherits:
OpaquePointer show all
Defined in:
lib/kafka/ffi/config.rb

Instance Attribute Summary

Attributes inherited from OpaquePointer

#pointer

Class Method Summary collapse

Instance Method Summary collapse

Methods inherited from OpaquePointer

by_ref, from_native, inherited, to_native

Constructor Details

#initialize(ptr) ⇒ Config

Returns a new instance of Config.



12
13
14
15
16
17
18
# File 'lib/kafka/ffi/config.rb', line 12

def initialize(ptr)
  super(ptr)

  # Maintain references to all of the set callbacks to avoid them being
  # garbage collected.
  @callbacks = {}
end

Class Method Details

.newObject



8
9
10
# File 'lib/kafka/ffi/config.rb', line 8

def self.new
  Kafka::FFI.rd_kafka_conf_new
end

Instance Method Details

#destroyObject

Note:

Never call #destroy on a Config that has been passed to Kafka::FFI.rd_kafka_new since the handle will take ownership of the config.

Free all resources used by the config.



376
377
378
379
380
# File 'lib/kafka/ffi/config.rb', line 376

def destroy
  if !pointer.null?
    ::Kafka::FFI.rd_kafka_conf_destroy(self)
  end
end

#dupConfig

Duplicate the current config

Returns:

  • (Config)

    Duplicated config



86
87
88
# File 'lib/kafka/ffi/config.rb', line 86

def dup
  ::Kafka::FFI.rd_kafka_conf_dup(self)
end

#dup_filter(*filter) ⇒ Object

Duplicate the config but do not copy any config options that match the filtered keys.



92
93
94
95
96
97
98
99
100
101
102
# File 'lib/kafka/ffi/config.rb', line 92

def dup_filter(*filter)
  ptr = ::FFI::MemoryPointer.new(:pointer, filter.length)

  ptr.write_array_of_pointer(
    filter.map { |str| ::FFI::MemoryPointer.from_string(str) },
  )

  ::Kafka::FFI.rd_kafka_conf_dup_filter(self, filter.length, ptr)
ensure
  ptr.free
end

#get(key) ⇒ String, :unknown

Get the current config value for the given key.

Parameters:

  • key (String)

    Config key to fetch the setting for.

Returns:

  • (String, :unknown)

    Value for the key or :unknown if not already set.



56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
# File 'lib/kafka/ffi/config.rb', line 56

def get(key)
  key = key.to_s

  # Will contain the size of the value at key
  size = ::FFI::MemoryPointer.new(:size_t)

  # Make an initial request for the size of buffer we need to allocate.
  # When trying to make a guess at the potential size the code would often
  # segfault due to rd_kafka_conf_get reallocating the buffer.
  err = ::Kafka::FFI.rd_kafka_conf_get(self, key, ::FFI::Pointer::NULL, size)
  if err != :ok
    return err
  end

  # Allocate a string long enough to contain the whole value.
  value = ::FFI::MemoryPointer.new(:char, size.read(:size_t))
  err = ::Kafka::FFI.rd_kafka_conf_get(self, key, value, size)
  if err != :ok
    return err
  end

  value.read_string
ensure
  size.free if size
  value.free if value
end

#set(key, value) ⇒ Object

Set the config option at ‘key` to `value`. The configuration options match those used by librdkafka (and the Java client).

Parameters:

  • key (String)

    Configuration key

  • value (String)

    Value to set

Raises:

See Also:



30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
# File 'lib/kafka/ffi/config.rb', line 30

def set(key, value)
  key = key.to_s
  value = value.to_s

  error = ::FFI::MemoryPointer.new(:char, 512)
  result = ::Kafka::FFI.rd_kafka_conf_set(self, key, value, error, error.size)

  # See config_result enum in ffi.rb
  case result
  when :ok
    nil
  when :unknown
    raise Kafka::FFI::UnknownConfigKey.new(key, value, error.read_string)
  when :invalid
    raise Kafka::FFI::InvalidConfigValue.new(key, value, error.read_string)
  end
ensure
  error.free if error
end

#set_background_event_cb {|client, event, opaque| ... } ⇒ Object Also known as: background_event_cb=

Note:

The application is responsible for calling #destroy on the event.

Note:

The application must not call #destroy on the Client inside the callback.

Set the callback that will be used for events published to the background queue. This enables a background thread that runs internal to librdkafka and can be used as a standard receiver for APIs that take a queue.

Yields:

  • (client, event, opaque)

    Called when a event is received by the queue.

Yield Parameters:

  • client (Client)

    Kafka Client for the event

  • event (Event)

    The event that occurred

  • opaque (::FFI::Pointer)

    Pointer to the configuration’s opaque pointer that was set via set_opaque.

See Also:



153
154
155
156
# File 'lib/kafka/ffi/config.rb', line 153

def set_background_event_cb(&block)
  @callbacks[:background_event_cb] = block
  ::Kafka::FFI.rd_kafka_conf_set_background_event_cb(self, &block)
end

#set_closesocket_cb(&block) ⇒ Object Also known as: closesocket_cb=



316
317
318
319
# File 'lib/kafka/ffi/config.rb', line 316

def set_closesocket_cb(&block)
  @callbacks[:closesocket_cb] = block
  ::Kafka::FFI.rd_kafka_conf_set_closesocket_cb(self, &block)
end

#set_connect_cb(&block) ⇒ Object Also known as: connect_cb=



310
311
312
313
# File 'lib/kafka/ffi/config.rb', line 310

def set_connect_cb(&block)
  @callbacks[:connect_cb] = block
  ::Kafka::FFI.rd_kafka_conf_set_connect_cb(self, &block)
end

#set_consume_cb {|message, opaque| ... } ⇒ Object Also known as: consume_cb=

Note:

Consumer only

Set consume callback for use with consumer_poll.

Yields:

  • (message, opaque)

Yield Parameters:

  • message (Message)
  • opaque (::FFI::Pointer)


186
187
188
189
# File 'lib/kafka/ffi/config.rb', line 186

def set_consume_cb(&block)
  @callbacks[:consume_cb] = block
  ::Kafka::FFI.rd_kafka_conf_set_consume_cb(self, &block)
end

#set_dr_msg_cb {|client, message, opaque| ... } ⇒ Object Also known as: dr_msg_cb=

Note:

Producer only

Set delivery report callback for the config. The delivery report callback will be called once for each message accepted by Producer#produce. The Message will have #error set in the event of a producer error.

The callback is called when a message is successfully produced or if librdkafka encountered a permanent failure.

Yields:

  • (client, message, opaque)

    Called for each Message produced.

Yield Parameters:

  • client (Client)

    Kafka Client for the event

  • message (Message)

    Message that was produced

  • opaque (::FFI::Pointer)

    Pointer to the configuration’s opaque pointer that was set via set_opaque.



173
174
175
176
# File 'lib/kafka/ffi/config.rb', line 173

def set_dr_msg_cb(&block)
  @callbacks[:dr_msg_cb] = block
  ::Kafka::FFI.rd_kafka_conf_set_dr_msg_cb(self, &block)
end

#set_error_cb {|client, error, reason, opaque| ... } ⇒ Object Also known as: error_cb=

Set error callback that is used by librdkafka to signal warnings and errors back to the application. These errors should generally be considered informational and non-permanent, librdkafka will try to recover from all types of errors.

Yields:

  • (client, error, reason, opaque)

Yield Parameters:

  • client (Client)
  • error (RD_KAFKA_RESP_ERR__FATAL)

    Fatal error occurred

  • error (Integer)

    Other error occurred

  • reason (String)
  • opaque (::FFI::Pointer)


243
244
245
246
# File 'lib/kafka/ffi/config.rb', line 243

def set_error_cb(&block)
  @callbacks[:error_cb] = block
  ::Kafka::FFI.rd_kafka_conf_set_error_cb(self, &block)
end

#set_events(events_mask) ⇒ Object

Enable event sourcing. Convenience method to set the ‘enabled_events` option as an integer.

Examples:

Set events using event symbol names

config.set_events([ :delivery, :log, :fetch ])

Set events using event constants

config.set_events([ RD_KAFKA_EVENT_DR, RD_KAFKA_EVENT_LOG ])

Parameters:

  • events_mask (Integer, Array<Symbol, Integer>)

    Bitmask of events to enable during queue poll.



118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
# File 'lib/kafka/ffi/config.rb', line 118

def set_events(events_mask)
  mask = events_mask

  # Support setting events
  if events_mask.is_a?(Array)
    mask = 0
    enum = ::Kafka::FFI.enum_type(:event_type)

    events_mask.each do |val|
      case val
      when Integer then mask |= val
      when Symbol  then mask |= (enum[val] || 0)
      end
    end
  end

  ::Kafka::FFI.rd_kafka_conf_set_events(self, mask)
end

#set_log_cb {|client, level, facility, message| ... } ⇒ Object Also known as: log_cb=

Note:

The application MUST NOT call any librdkafka APIs or do any prolonged work in a log_cb unless logs have been forwarded to a queue via set_log_queue.

Set the logging callback. By default librdkafka will print to stderr (or syslog if configured).

Yields:

  • (client, level, facility, message)

Yield Parameters:

  • client (Client)
  • level (Integer)

    Log level

  • facility (String)

    Log facility

  • message (String)

    Log message



276
277
278
279
# File 'lib/kafka/ffi/config.rb', line 276

def set_log_cb(&block)
  @callbacks[:log_cb] = block
  ::Kafka::FFI.rd_kafka_conf_set_log_cb(self, &block)
end

#set_oauthbearer_token_refresh_cb(&block) ⇒ Object Also known as: oauthbearer_token_refresh_cb=



298
299
300
301
# File 'lib/kafka/ffi/config.rb', line 298

def set_oauthbearer_token_refresh_cb(&block)
  @callbacks[:oauthbearer_token_refresh_cb] = block
  ::Kafka::FFI.rd_kafka_conf_set_oauthbearer_token_refresh_cb(self, &block)
end

#set_offset_commit_cb {|client, error, offets| ... } ⇒ Object Also known as: offset_commit_cb=

Note:

Consumer only

Set offset commit callback which is called when offsets are committed by the consumer.

Yields:

  • (client, error, offets)

Yield Parameters:



226
227
228
229
# File 'lib/kafka/ffi/config.rb', line 226

def set_offset_commit_cb(&block)
  @callbacks[:offset_commit_cb] = block
  ::Kafka::FFI.rd_kafka_conf_set_offset_commit_cb(self, &block)
end

#set_open_cb(&block) ⇒ Object Also known as: open_cb=



322
323
324
325
326
327
328
329
# File 'lib/kafka/ffi/config.rb', line 322

def set_open_cb(&block)
  if ::FFI::Platform.windows?
    raise Error, "set_open_cb is not available on Windows"
  end

  @callbacks[:open_cb] = block
  ::Kafka::FFI.rd_kafka_conf_set_open_cb(self, &block)
end

#set_rebalance_cb {|client, error, partitions, opaque| ... } ⇒ Object Also known as: rebalance_cb=

Note:

Consumer only

Set rebalance callback for use with consumer group balancing. Setting the rebalance callback will turn off librdkafka’s automatic handling of assignment/revocation and delegates the responsibility to the application’s callback.

Yields:

  • (client, error, partitions, opaque)

Yield Parameters:

See Also:

  • rd_kafka_conf_set_rebalance_cb


209
210
211
212
# File 'lib/kafka/ffi/config.rb', line 209

def set_rebalance_cb(&block)
  @callbacks[:rebalance_cb] = block
  ::Kafka::FFI.rd_kafka_conf_set_rebalance_cb(self, &block)
end

#set_socket_cb(&block) ⇒ Object Also known as: socket_cb=



304
305
306
307
# File 'lib/kafka/ffi/config.rb', line 304

def set_socket_cb(&block)
  @callbacks[:socket_cb] = block
  ::Kafka::FFI.rd_kafka_conf_set_socket_cb(self, &block)
end

#set_ssl_cert(cert_type, cert_enc, certificate) ⇒ Object Also known as: ssl_cert=

Note:

The private key may require a password which must be specified with the ‘ssl.key.password` property prior to calling this function.

Note:

Private and public keys, in PEM format, can be set with the ‘ssl.key.pem` and `ssl.certificate.pem` configuration properties.

Set the certificate for secure communication with the Kafka cluster.

Parameters:

  • cert_type (:public, :private, :ca)
  • cert_enc (:pkcs12, :der, :pem)
  • certificate (String)

    Encoded certificate

  • certificate (nil)

    Clear the stored certificate

Raises:

  • (ConfigError)

    Certificate was not properly encoded or librdkafka was not compiled with SSL/TLS.



355
356
357
358
359
360
361
362
363
364
365
366
367
368
# File 'lib/kafka/ffi/config.rb', line 355

def set_ssl_cert(cert_type, cert_enc, certificate)
  error = ::FFI::MemoryPointer.new(:char, 512)

  err = ::Kafka::FFI.rd_kafka_conf_set_ssl_cert(cert_type, cert_enc, certificate, certificate.bytesize, error, error.size)
  if err != :ok
    # Property name isn't exact since this appears to have some routing
    # based on cert type to determine the exact key.
    raise ConfigError, "ssl_cert", error.read_string
  end

  nil
ensure
  error.free
end

#set_ssl_cert_verify_cb(&block) ⇒ Object Also known as: ssl_cert_verify_cb=



332
333
334
335
# File 'lib/kafka/ffi/config.rb', line 332

def set_ssl_cert_verify_cb(&block)
  @callbacks[:ssl_cert_verify_cb] = block
  ::Kafka::FFI.rd_kafka_conf_set_ssl_cert_verify_cb(self, &block)
end

#set_stats_cb {|client, json, json_len, opaque| ... } ⇒ Object Also known as: stats_cb=

Set statistics callback that is triggered every ‘statistics.interval.ms` with a JSON document containing connection statistics.

Yields:

  • (client, json, json_len, opaque)

Yield Parameters:

  • client (Client)
  • json (String)

    Statistics payload

  • json_len (Integer)

    Length of the JSON payload

  • opaque (::FFI::Pointer)

See Also:



292
293
294
295
# File 'lib/kafka/ffi/config.rb', line 292

def set_stats_cb(&block)
  @callbacks[:stats_cb] = block
  ::Kafka::FFI.rd_kafka_conf_set_stats_cb(self, &block)
end

#set_throttle_cb {|client, broker_name, broker_id, throttle_ms, opaque| ... } ⇒ Object Also known as: throttle_cb=

Set throttle callback that is used to forward broker throttle times to the application.

Yields:

  • (client, broker_name, broker_id, throttle_ms, opaque)

Yield Parameters:

  • client (Client)
  • broker_name (String)
  • broker_id (Integer)
  • throttle_ms (Integer)

    Throttle time in milliseconds

  • opaque (::FFI::Pointer)


258
259
260
261
# File 'lib/kafka/ffi/config.rb', line 258

def set_throttle_cb(&block)
  @callbacks[:throttle_cb] = block
  ::Kafka::FFI.rd_kafka_conf_set_throttle_cb(self, &block)
end