Class: Rdkafka::Config

Inherits:
Object
  • Object
show all
Defined in:
lib/rdkafka/config.rb

Overview

Configuration for a Kafka consumer or producer. You can create an instance and use the consumer and producer methods to create a client. Documentation of the available configuration options is available on https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md.

Defined Under Namespace

Classes: ClientCreationError, ConfigError, NoLoggerError

Constant Summary collapse

DEFAULT_CONFIG =

Default config that can be overwritten.

{
  # Request api version so advanced features work
  :"api.version.request" => true
}.freeze
REQUIRED_CONFIG =

Required config that cannot be overwritten.

{
  # Enable log queues so we get callbacks in our own Ruby threads
  :"log.queue" => true
}.freeze

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(config_hash = {}) ⇒ Config

Returns a new config with the provided options which are merged with DEFAULT_CONFIG.

Parameters:

  • config_hash (Hash{String,Symbol => String}) (defaults to: {})

    The config options for rdkafka


73
74
75
76
# File 'lib/rdkafka/config.rb', line 73

def initialize(config_hash = {})
  @config_hash = DEFAULT_CONFIG.merge(config_hash)
  @consumer_rebalance_listener = nil
end

Class Method Details

.loggerLogger

Returns the current logger, by default this is a logger to stdout.

Returns:

  • (Logger)

18
19
20
# File 'lib/rdkafka/config.rb', line 18

def self.logger
  @@logger
end

.logger=(logger) ⇒ nil

Set the logger that will be used for all logging output by this library.

Parameters:

  • logger (Logger)

    The logger to be used

Returns:

  • (nil)

Raises:


27
28
29
30
# File 'lib/rdkafka/config.rb', line 27

def self.logger=(logger)
  raise NoLoggerError if logger.nil?
  @@logger=logger
end

.statistics_callbackProc?

Returns the current statistics callback, by default this is nil.

Returns:

  • (Proc, nil)

47
48
49
# File 'lib/rdkafka/config.rb', line 47

def self.statistics_callback
  @@statistics_callback
end

.statistics_callback=(callback) ⇒ nil

Set a callback that will be called every time the underlying client emits statistics. You can configure if and how often this happens using statistics.interval.ms. The callback is called with a hash that's documented here: https://github.com/edenhill/librdkafka/blob/master/STATISTICS.md

Parameters:

  • callback (Proc)

    The callback

Returns:

  • (nil)

Raises:

  • (TypeError)

39
40
41
42
# File 'lib/rdkafka/config.rb', line 39

def self.statistics_callback=(callback)
  raise TypeError.new("Callback has to be a proc or lambda") unless callback.is_a? Proc
  @@statistics_callback = callback
end

Instance Method Details

#[](key) ⇒ String?

Get a config option with the specified key

Parameters:

  • key (String)

    The config option's key

Returns:

  • (String, nil)

    The config option or nil if it is not present


93
94
95
# File 'lib/rdkafka/config.rb', line 93

def [](key)
  @config_hash[key]
end

#[]=(key, value) ⇒ nil

Set a config option.

Parameters:

  • key (String)

    The config option's key

  • value (String)

    The config option's value

Returns:

  • (nil)

84
85
86
# File 'lib/rdkafka/config.rb', line 84

def []=(key, value)
  @config_hash[key] = value
end

#consumerConsumer

Create a consumer with this configuration.

Returns:

Raises:


110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
# File 'lib/rdkafka/config.rb', line 110

def consumer
  opaque = Opaque.new
  config = native_config(opaque)

  if @consumer_rebalance_listener
    opaque.consumer_rebalance_listener = @consumer_rebalance_listener
    Rdkafka::Bindings.rd_kafka_conf_set_rebalance_cb(config, Rdkafka::Bindings::RebalanceCallback)
  end

  kafka = native_kafka(config, :rd_kafka_consumer)

  # Redirect the main queue to the consumer
  Rdkafka::Bindings.rd_kafka_poll_set_consumer(kafka)

  # Return consumer with Kafka client
  Rdkafka::Consumer.new(kafka)
end

#consumer_rebalance_listener=(listener) ⇒ Object

Get notifications on partition assignment/revocation for the subscribed topics

Parameters:

  • listener (Object, #on_partitions_assigned, #on_partitions_revoked)

    listener instance


100
101
102
# File 'lib/rdkafka/config.rb', line 100

def consumer_rebalance_listener=(listener)
  @consumer_rebalance_listener = listener
end

#producerProducer

Create a producer with this configuration.

Returns:

Raises:


134
135
136
137
138
139
140
141
142
143
144
145
# File 'lib/rdkafka/config.rb', line 134

def producer
  # Create opaque
  opaque = Opaque.new
  # Create Kafka config
  config = native_config(opaque)
  # Set callback to receive delivery reports on config
  Rdkafka::Bindings.rd_kafka_conf_set_dr_msg_cb(config, Rdkafka::Bindings::DeliveryCallback)
  # Return producer with Kafka client
  Rdkafka::Producer.new(native_kafka(config, :rd_kafka_producer)).tap do |producer|
    opaque.producer = producer
  end
end