Class: Kafka::Sasl::OAuth
- Inherits:
-
Object
- Object
- Kafka::Sasl::OAuth
- Defined in:
- lib/kafka/sasl/oauth.rb
Constant Summary collapse
- OAUTH_IDENT =
"OAUTHBEARER"
Instance Method Summary collapse
- #authenticate!(host, encoder, decoder) ⇒ Object
- #configured? ⇒ Boolean
- #ident ⇒ Object
-
#initialize(logger:, token_provider:) ⇒ OAuth
constructor
token_provider: THE FOLLOWING INTERFACE MUST BE FULFILLED:.
Constructor Details
#initialize(logger:, token_provider:) ⇒ OAuth
token_provider: THE FOLLOWING INTERFACE MUST BE FULFILLED:
- REQUIRED
-
TokenProvider#token - Returns an ID/Access Token to be sent to the Kafka client.
The implementation should ensure token reuse so that multiple calls at connect time do not
create multiple tokens. The implementation should also periodically refresh the token in
order to guarantee that each call returns an unexpired token. A timeout error should
be returned after a short period of inactivity so that the broker can log debugging
info and retry.
- OPTIONAL
-
TokenProvider#extensions - Returns a map of key-value pairs that can be sent with the
SASL/OAUTHBEARER initial client response. If not provided, the values are ignored. This feature
is only available in Kafka >= 2.1.0.
21 22 23 24 |
# File 'lib/kafka/sasl/oauth.rb', line 21 def initialize(logger:, token_provider:) @logger = TaggedLogger.new(logger) @token_provider = token_provider end |
Instance Method Details
#authenticate!(host, encoder, decoder) ⇒ Object
34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 |
# File 'lib/kafka/sasl/oauth.rb', line 34 def authenticate!(host, encoder, decoder) # Send SASLOauthBearerClientResponse with token @logger.debug "Authenticating to #{host} with SASL #{OAUTH_IDENT}" encoder.write_bytes(initial_client_response) begin # receive SASL OAuthBearer Server Response msg = decoder.bytes raise Kafka::Error, "SASL #{OAUTH_IDENT} authentication failed: unknown error" unless msg rescue Errno::ETIMEDOUT, EOFError => e raise Kafka::Error, "SASL #{OAUTH_IDENT} authentication failed: #{e.}" end @logger.debug "SASL #{OAUTH_IDENT} authentication successful." end |
#configured? ⇒ Boolean
30 31 32 |
# File 'lib/kafka/sasl/oauth.rb', line 30 def configured? @token_provider end |
#ident ⇒ Object
26 27 28 |
# File 'lib/kafka/sasl/oauth.rb', line 26 def ident OAUTH_IDENT end |