Class: Kafka::Sasl::AwsMskIam
- Inherits:
-
Object
- Object
- Kafka::Sasl::AwsMskIam
- Defined in:
- lib/kafka/sasl/awsmskiam.rb
Constant Summary collapse
- AWS_MSK_IAM =
"AWS_MSK_IAM"
Instance Method Summary collapse
- #authenticate!(host, encoder, decoder) ⇒ Object
- #configured? ⇒ Boolean
- #ident ⇒ Object
-
#initialize(aws_region:, access_key_id:, secret_key_id:, session_token: nil, logger:) ⇒ AwsMskIam
constructor
A new instance of AwsMskIam.
Constructor Details
#initialize(aws_region:, access_key_id:, secret_key_id:, session_token: nil, logger:) ⇒ AwsMskIam
Returns a new instance of AwsMskIam.
12 13 14 15 16 17 18 19 20 |
# File 'lib/kafka/sasl/awsmskiam.rb', line 12 def initialize(aws_region:, access_key_id:, secret_key_id:, session_token: nil,logger:) @semaphore = Mutex.new @aws_region = aws_region @access_key_id = access_key_id @secret_key_id = secret_key_id @session_token = session_token @logger = TaggedLogger.new(logger) end |
Instance Method Details
#authenticate!(host, encoder, decoder) ⇒ Object
30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 |
# File 'lib/kafka/sasl/awsmskiam.rb', line 30 def authenticate!(host, encoder, decoder) @logger.debug "Authenticating #{@access_key_id} with SASL #{AWS_MSK_IAM}" host_without_port = host.split(':', -1).first time_now = Time.now.utc msg = authentication_payload(host: host_without_port, time_now: time_now) @logger.debug "Sending first client SASL AWS_MSK_IAM message:" @logger.debug msg encoder.write_bytes(msg) begin @server_first_message = decoder.bytes @logger.debug "Received first server SASL AWS_MSK_IAM message: #{@server_first_message}" raise Kafka::Error, "SASL AWS_MSK_IAM authentication failed: unknown error" unless @server_first_message rescue Errno::ETIMEDOUT, EOFError => e raise Kafka::Error, "SASL AWS_MSK_IAM authentication failed: #{e.}" end @logger.debug "SASL #{AWS_MSK_IAM} authentication successful" end |
#configured? ⇒ Boolean
26 27 28 |
# File 'lib/kafka/sasl/awsmskiam.rb', line 26 def configured? @aws_region && @access_key_id && @secret_key_id end |
#ident ⇒ Object
22 23 24 |
# File 'lib/kafka/sasl/awsmskiam.rb', line 22 def ident AWS_MSK_IAM end |