Class: Rdkafka::Config
- Inherits:
-
Object
- Object
- Rdkafka::Config
- Defined in:
- lib/rdkafka/config.rb
Overview
Configuration for a Kafka consumer or producer. You can create an instance and use the consumer and producer methods to create a client. Documentation of the available configuration options is available on https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md.
Defined Under Namespace
Classes: ClientCreationError, ConfigError, NoLoggerError
Constant Summary collapse
- DEFAULT_CONFIG =
Default config that can be overwritten.
{ # Request api version so advanced features work :"api.version.request" => true }.freeze
- REQUIRED_CONFIG =
Required config that cannot be overwritten.
{ # Enable log queues so we get callbacks in our own Ruby threads :"log.queue" => true }.freeze
Class Method Summary collapse
-
.logger ⇒ Logger
Returns the current logger, by default this is a logger to stdout.
-
.logger=(logger) ⇒ nil
Set the logger that will be used for all logging output by this library.
-
.statistics_callback ⇒ Proc?
Returns the current statistics callback, by default this is nil.
-
.statistics_callback=(callback) ⇒ nil
Set a callback that will be called every time the underlying client emits statistics.
Instance Method Summary collapse
-
#[](key) ⇒ String?
Get a config option with the specified key.
-
#[]=(key, value) ⇒ nil
Set a config option.
-
#admin ⇒ Admin
Create an admin instance with this configuration.
-
#consumer ⇒ Consumer
Create a consumer with this configuration.
-
#consumer_rebalance_listener=(listener) ⇒ Object
Get notifications on partition assignment/revocation for the subscribed topics.
-
#initialize(config_hash = {}) ⇒ Config
constructor
Returns a new config with the provided options which are merged with DEFAULT_CONFIG.
-
#producer ⇒ Producer
Create a producer with this configuration.
Constructor Details
#initialize(config_hash = {}) ⇒ Config
Returns a new config with the provided options which are merged with DEFAULT_CONFIG.
73 74 75 76 |
# File 'lib/rdkafka/config.rb', line 73 def initialize(config_hash = {}) @config_hash = DEFAULT_CONFIG.merge(config_hash) @consumer_rebalance_listener = nil end |
Class Method Details
.logger ⇒ Logger
Returns the current logger, by default this is a logger to stdout.
18 19 20 |
# File 'lib/rdkafka/config.rb', line 18 def self.logger @@logger end |
.logger=(logger) ⇒ nil
Set the logger that will be used for all logging output by this library.
27 28 29 30 |
# File 'lib/rdkafka/config.rb', line 27 def self.logger=(logger) raise NoLoggerError if logger.nil? @@logger=logger end |
.statistics_callback ⇒ Proc?
Returns the current statistics callback, by default this is nil.
47 48 49 |
# File 'lib/rdkafka/config.rb', line 47 def self.statistics_callback @@statistics_callback end |
.statistics_callback=(callback) ⇒ nil
Set a callback that will be called every time the underlying client emits statistics.
You can configure if and how often this happens using statistics.interval.ms
.
The callback is called with a hash that's documented here: https://github.com/edenhill/librdkafka/blob/master/STATISTICS.md
39 40 41 42 |
# File 'lib/rdkafka/config.rb', line 39 def self.statistics_callback=(callback) raise TypeError.new("Callback has to be a proc or lambda") unless callback.is_a? Proc @@statistics_callback = callback end |
Instance Method Details
#[](key) ⇒ String?
Get a config option with the specified key
93 94 95 |
# File 'lib/rdkafka/config.rb', line 93 def [](key) @config_hash[key] end |
#[]=(key, value) ⇒ nil
Set a config option.
84 85 86 |
# File 'lib/rdkafka/config.rb', line 84 def []=(key, value) @config_hash[key] = value end |
#admin ⇒ Admin
Create an admin instance with this configuration.
153 154 155 156 157 158 |
# File 'lib/rdkafka/config.rb', line 153 def admin opaque = Opaque.new config = native_config(opaque) Rdkafka::Bindings.rd_kafka_conf_set_background_event_cb(config, Rdkafka::Callbacks::BackgroundEventCallbackFunction) Rdkafka::Admin.new(native_kafka(config, :rd_kafka_producer)) end |
#consumer ⇒ Consumer
Create a consumer with this configuration.
110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 |
# File 'lib/rdkafka/config.rb', line 110 def consumer opaque = Opaque.new config = native_config(opaque) if @consumer_rebalance_listener opaque.consumer_rebalance_listener = @consumer_rebalance_listener Rdkafka::Bindings.rd_kafka_conf_set_rebalance_cb(config, Rdkafka::Bindings::RebalanceCallback) end kafka = native_kafka(config, :rd_kafka_consumer) # Redirect the main queue to the consumer Rdkafka::Bindings.rd_kafka_poll_set_consumer(kafka) # Return consumer with Kafka client Rdkafka::Consumer.new(kafka) end |
#consumer_rebalance_listener=(listener) ⇒ Object
Get notifications on partition assignment/revocation for the subscribed topics
100 101 102 |
# File 'lib/rdkafka/config.rb', line 100 def consumer_rebalance_listener=(listener) @consumer_rebalance_listener = listener end |
#producer ⇒ Producer
Create a producer with this configuration.
134 135 136 137 138 139 140 141 142 143 144 145 |
# File 'lib/rdkafka/config.rb', line 134 def producer # Create opaque opaque = Opaque.new # Create Kafka config config = native_config(opaque) # Set callback to receive delivery reports on config Rdkafka::Bindings.rd_kafka_conf_set_dr_msg_cb(config, Rdkafka::Callbacks::DeliveryCallbackFunction) # Return producer with Kafka client Rdkafka::Producer.new(native_kafka(config, :rd_kafka_producer)).tap do |producer| opaque.producer = producer end end |