Class: Kafka::Config

Inherits:
Object
  • Object
show all
Defined in:
lib/kafka/config.rb

Instance Method Summary collapse

Constructor Details

#initialize(opts = {}) ⇒ Config

Create a new Config for initializing a Kafka Consumer or Producer. This config is reusable and can be used to configure multiple Consumers or Producers.

Parameters:

  • opts (Hash{[String, Symbol] => [String, Integer, nil, Boolean]}) (defaults to: {})

Raises:

  • (TypeError)

    Value was not of the correct type

See Also:



14
15
16
17
18
19
20
21
22
# File 'lib/kafka/config.rb', line 14

def initialize(opts = {})
  @opts = {}
  @callbacks = {}

  # Use #set to rekey the options as strings and type check the value.
  opts.each_pair do |key, val|
    set(key, val)
  end
end

Instance Method Details

#get(key) ⇒ nil

Retrieve the configured value for the key.

Returns:

  • (nil)

    Value is not set

  • Configured value for the given key



28
29
30
# File 'lib/kafka/config.rb', line 28

def get(key)
  @opts[key.to_s]
end

#on_consume(&block) ⇒ Object

Note:

Consumer only



67
68
69
# File 'lib/kafka/config.rb', line 67

def on_consume(&block)
  @callbacks[:consume] = block
end

#on_delivery_report(&block) ⇒ Object

Note:

Producer only

Callback for the delivery status of a message published to the Kafka cluster.



60
61
62
# File 'lib/kafka/config.rb', line 60

def on_delivery_report(&block)
  @callbacks[:delivery_report] = block
end

#on_error(&block) ⇒ Object

Callback for errors from the cluster. Most errors are informational and should be ignored as librdkafka will attempt to recover. However fatal errors can be reported which should cause the system to gracefully shutdown.



86
87
88
# File 'lib/kafka/config.rb', line 86

def on_error(&block)
  @callbacks[:error] = block
end

#on_log(&block) ⇒ Object

Callback for log messages



100
101
102
# File 'lib/kafka/config.rb', line 100

def on_log(&block)
  @callbacks[:log] = block
end

#on_offset_commit(&block) ⇒ Object

Note:

Consumer only

Callback for result of automatic or manual offset commits.



76
77
78
# File 'lib/kafka/config.rb', line 76

def on_offset_commit(&block)
  @callbacks[:offset_commit] = block
end

#on_stats(&block) ⇒ Object

Callback for connetion stats



107
108
109
# File 'lib/kafka/config.rb', line 107

def on_stats(&block)
  @callbacks[:stats] = block
end

#on_throttle(&block) ⇒ Object

Callback for when Brokers throttle a client



93
94
95
# File 'lib/kafka/config.rb', line 93

def on_throttle(&block)
  @callbacks[:throttle] = block
end

#set(key, val) ⇒ Object

Set configratuon option ‘key` to `value`.

Parameters:

  • key (#to_s)

    Configuration option

  • value (String, Integer, Boolean, nil)

Raises:

  • (TypeError)

    Value was not of the correct type

See Also:



40
41
42
43
44
45
46
47
48
49
50
51
52
# File 'lib/kafka/config.rb', line 40

def set(key, val)
  key = key.to_s

  @opts[key] =
    case val
    when String, Integer, true, false, nil
      val
    else
      raise TypeError, "#{key}'s value must be a String, Integer, true, or false"
    end

  nil
end

#to_ffiKafka::FFI::Config

Allocate and configure a new Kafka::FFI::Config that mirrors this Config. The returned Kafka::FFI::Config should be either passed to initialize a new Client or eventually destroyed. Once passed to a Client, the Config is now owned by the Client and should not be modified or destroyed.

Returns:



117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
# File 'lib/kafka/config.rb', line 117

def to_ffi
  conf = Kafka::FFI.rd_kafka_conf_new

  @opts.each do |name, value|
    conf.set(name, value)
  end

  # Omitted callbacks:
  #  - background_event - Requires lower level usage
  #  - rebalance        - Requires knowing the rebalance semantics
  #  - all socket       - Unknown need at this level
  #  - ssl_cert_verify  - Currently not needed
  #  - oauthbearer_token_refresh - Unable to test
  @callbacks.each do |name, callback|
    case name
    when :delivery_report  then conf.set_dr_msg_cb(&callback)
    when :consume          then conf.set_consume_cb(&callback)
    when :offset_commit    then conf.set_offset_commit_cb(&callback)
    when :error            then conf.set_error_cb(&callback)
    when :throttle         then conf.set_throttle_cb(&callback)
    when :log              then conf.set_log_cb(&callback)
    when :stats            then conf.set_stats_cb(&callback)
    end
  end

  conf
end