Class: Kafka::SaslAuthenticator

Inherits:
Object
  • Object
show all
Defined in:
lib/kafka/sasl_authenticator.rb

Instance Method Summary collapse

Constructor Details

#initialize(logger:, sasl_gssapi_principal:, sasl_gssapi_keytab:, sasl_plain_authzid:, sasl_plain_username:, sasl_plain_password:, sasl_scram_username:, sasl_scram_password:, sasl_scram_mechanism:, sasl_oauth_token_provider:, sasl_aws_msk_iam_access_key_id:, sasl_aws_msk_iam_secret_key_id:, sasl_aws_msk_iam_aws_region:, sasl_aws_msk_iam_session_token: nil) ⇒ SaslAuthenticator

Returns a new instance of SaslAuthenticator.



11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
# File 'lib/kafka/sasl_authenticator.rb', line 11

def initialize(logger:, sasl_gssapi_principal:, sasl_gssapi_keytab:,
               sasl_plain_authzid:, sasl_plain_username:, sasl_plain_password:,
               sasl_scram_username:, sasl_scram_password:, sasl_scram_mechanism:,
               sasl_oauth_token_provider:,
               sasl_aws_msk_iam_access_key_id:,
               sasl_aws_msk_iam_secret_key_id:,
               sasl_aws_msk_iam_aws_region:,
               sasl_aws_msk_iam_session_token: nil)
  @logger = TaggedLogger.new(logger)

  @plain = Sasl::Plain.new(
    authzid: sasl_plain_authzid,
    username: sasl_plain_username,
    password: sasl_plain_password,
    logger: @logger,
  )

  @gssapi = Sasl::Gssapi.new(
    principal: sasl_gssapi_principal,
    keytab: sasl_gssapi_keytab,
    logger: @logger,
  )

  @scram = Sasl::Scram.new(
    username: sasl_scram_username,
    password: sasl_scram_password,
    mechanism: sasl_scram_mechanism,
    logger: @logger,
  )

  @aws_msk_iam = Sasl::AwsMskIam.new(
    access_key_id: sasl_aws_msk_iam_access_key_id,
    secret_key_id: sasl_aws_msk_iam_secret_key_id,
    aws_region: sasl_aws_msk_iam_aws_region,
    session_token: sasl_aws_msk_iam_session_token,
    logger: @logger,
  )

  @oauth = Sasl::OAuth.new(
    token_provider: sasl_oauth_token_provider,
    logger: @logger,
  )

  @mechanism = [@gssapi, @plain, @scram, @oauth, @aws_msk_iam].find(&:configured?)
end

Instance Method Details

#authenticate!(connection) ⇒ Object



61
62
63
64
65
66
67
68
69
70
71
72
# File 'lib/kafka/sasl_authenticator.rb', line 61

def authenticate!(connection)
  return unless enabled?

  ident = @mechanism.ident
  response = connection.send_request(Kafka::Protocol::SaslHandshakeRequest.new(ident))

  unless response.error_code == 0 && response.enabled_mechanisms.include?(ident)
    raise Kafka::Error, "#{ident} is not supported."
  end

  @mechanism.authenticate!(connection.to_s, connection.encoder, connection.decoder)
end

#enabled?Boolean

Returns:

  • (Boolean)


57
58
59
# File 'lib/kafka/sasl_authenticator.rb', line 57

def enabled?
  !@mechanism.nil?
end