Module: Kafka::BrokerUri

Defined in:
lib/kafka/broker_uri.rb

Constant Summary collapse

DEFAULT_PORT =
9092
URI_SCHEMES =
["kafka", "kafka+ssl", "plaintext", "ssl"]

Class Method Summary collapse

Class Method Details

.parse(str) ⇒ URI

Parses a Kafka broker URI string.

Examples of valid strings:

  • ‘kafka1.something`

  • ‘kafka1.something:1234`

  • ‘kafka://kafka1.something:1234`

  • ‘kafka+ssl://kafka1.something:1234`

  • ‘plaintext://kafka1.something:1234`

Parameters:

  • str (String)

    a Kafka broker URI string.

Returns:

  • (URI)


21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
# File 'lib/kafka/broker_uri.rb', line 21

def self.parse(str)
  # Make sure there's a scheme part if it's missing.
  str = "kafka://" + str unless str.include?("://")

  uri = URI.parse(str)
  uri.port ||= DEFAULT_PORT

  # Map some schemes to others.
  case uri.scheme
  when 'plaintext'
    uri.scheme = 'kafka'
  when 'ssl'
    uri.scheme = 'kafka+ssl'
  end

  unless URI_SCHEMES.include?(uri.scheme)
    raise Kafka::Error, "invalid protocol `#{uri.scheme}` in `#{str}`"
  end

  uri
end