Class: Kafka::Sasl::OAuth

Inherits:
Object
  • Object
show all
Defined in:
lib/kafka/sasl/oauth.rb

Constant Summary collapse

OAUTH_IDENT =
"OAUTHBEARER"

Instance Method Summary collapse

Constructor Details

#initialize(logger:, token_provider:) ⇒ OAuth

token_provider: THE FOLLOWING INTERFACE MUST BE FULFILLED:

REQUIRED

TokenProvider#token - Returns an ID/Access Token to be sent to the Kafka client.

The implementation should ensure token reuse so that multiple calls at connect time do not
create multiple tokens. The implementation should also periodically refresh the token in
order to guarantee that each call returns an unexpired token. A timeout error should
be returned after a short period of inactivity so that the broker can log debugging
info and retry.
OPTIONAL

TokenProvider#extensions - Returns a map of key-value pairs that can be sent with the

SASL/OAUTHBEARER initial client response. If not provided, the values are ignored. This feature
is only available in Kafka >= 2.1.0.


21
22
23
24
# File 'lib/kafka/sasl/oauth.rb', line 21

def initialize(logger:, token_provider:)
  @logger = TaggedLogger.new(logger)
  @token_provider = token_provider
end

Instance Method Details

#authenticate!(host, encoder, decoder) ⇒ Object



34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
# File 'lib/kafka/sasl/oauth.rb', line 34

def authenticate!(host, encoder, decoder)
  # Send SASLOauthBearerClientResponse with token
  @logger.debug "Authenticating to #{host} with SASL #{OAUTH_IDENT}"

  encoder.write_bytes(initial_client_response)

  begin
    # receive SASL OAuthBearer Server Response
    msg = decoder.bytes
    raise Kafka::Error, "SASL #{OAUTH_IDENT} authentication failed: unknown error" unless msg
  rescue Errno::ETIMEDOUT, EOFError => e
    raise Kafka::Error, "SASL #{OAUTH_IDENT} authentication failed: #{e.message}"
  end

  @logger.debug "SASL #{OAUTH_IDENT} authentication successful."
end

#configured?Boolean

Returns:

  • (Boolean)


30
31
32
# File 'lib/kafka/sasl/oauth.rb', line 30

def configured?
  @token_provider
end

#identObject



26
27
28
# File 'lib/kafka/sasl/oauth.rb', line 26

def ident
  OAUTH_IDENT
end