Class: PrxRubyAwsCreds
- Inherits:
-
Object
- Object
- PrxRubyAwsCreds
- Defined in:
- lib/prx-ruby-aws-creds.rb
Class Method Summary collapse
-
.assume_role_options(profile_name) ⇒ Object
Returns the options passed to AssumeRole.
-
.cache_key_path(role_options) ⇒ Object
The cache key is based on the parameters used to request temporary credentials (using either STS AssumeRole or SSO GetRoleCredentials).
-
.client_credentials(profile_name = nil) ⇒ Object
Returns temporary IAM client (Aws::Credentials) credentials for a given profile.
-
.get_and_cache_credentials(profile_name) ⇒ Object
Makes a request to some AWS API endpoint that can generate temporary IAM credentials (e.g., AssumeRole, GetRoleCredentials, etc) based on the configuration of the selected profile.
-
.load_and_verify_cached_credentials(profile_name) ⇒ Object
For the selected profile, look for a set of cached temporary IAM credentials.
-
.sso_get_cached_access_token(start_url) ⇒ Object
For a given SSO start URL, return a valid access token from the cache.
-
.sso_get_role_options(profile_name) ⇒ Object
Returns the options passed to SSO#get_role_credentials.
Class Method Details
.assume_role_options(profile_name) ⇒ Object
Returns the options passed to AssumeRole. This is used when the profile uses a key/secret. If the selected profile is not configured for key/secret, returns nil.
118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 |
# File 'lib/prx-ruby-aws-creds.rb', line 118 def (profile_name) aws_config_file = IniFile.load(AWS_CONFIG_FILE) aws_config_file_section = aws_config_file["profile #{profile_name}"] # Get the role ARN for the selected profile role_arn = aws_config_file_section["role_arn"] # Extract some values from the ARN role_name = role_arn.split("role/")[1] account_id = role_arn.split(":")[4] { role_arn: "arn:aws:sts::#{account_id}:role/#{role_name}", role_session_name: "ruby-sdk-session-#{Time.now.to_i}", duration_seconds: 3600 } end |
.cache_key_path(role_options) ⇒ Object
The cache key is based on the parameters used to request temporary credentials (using either STS AssumeRole or SSO GetRoleCredentials). The role session name is removed if it’s randomly generated (which it always is for us). If the options were ever to include a policy document, that should get sorted before hashing. github.com/boto/botocore/blob/88d780dea1684da00689f2eef388fa4c782ced08/botocore/credentials.py#L700
For any
45 46 47 48 49 50 51 52 |
# File 'lib/prx-ruby-aws-creds.rb', line 45 def cache_key_path() key_opts = .clone key_opts.delete(:role_session_name) key_opts.delete(:access_token) cache_key = Digest::SHA1.hexdigest(JSON.dump(key_opts)) "#{CACHE_DIRECTORY}/#{cache_key}.json" end |
.client_credentials(profile_name = nil) ⇒ Object
Returns temporary IAM client (Aws::Credentials) credentials for a given profile.
238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 |
# File 'lib/prx-ruby-aws-creds.rb', line 238 def client_credentials(profile_name = nil) profile_name ||= OPTS[:profile] # For the selected profile, get the appropriate set of options. = (profile_name) || (profile_name) return if ! # Check for a cache file with a name derived from those options. if !File.file?(cache_key_path()) # When no cache exists for these options, fetch new credentials, cache them # and return them. get_and_cache_credentials(profile_name) else # When there is a cache for these options, return them if they are still # valid, otherwise refresh them and return the new credentials. load_and_verify_cached_credentials(profile_name) end end |
.get_and_cache_credentials(profile_name) ⇒ Object
Makes a request to some AWS API endpoint that can generate temporary IAM credentials (e.g., AssumeRole, GetRoleCredentials, etc) based on the configuration of the selected profile.
Cache the resulting credentials in the CACHE_DIRECTORY. The file is named using a hash of the options passed to the endpoint.
142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 |
# File 'lib/prx-ruby-aws-creds.rb', line 142 def get_and_cache_credentials(profile_name) # Make sure the cache directory exists FileUtils.mkdir_p CACHE_DIRECTORY aws_config_file = IniFile.load(AWS_CONFIG_FILE) aws_config_file_section = aws_config_file["profile #{profile_name}"] if aws_config_file_section["sso_role_name"] || aws_config_file_section["sso_session"] # For SSO profiles, call GetRoleCredentials with a role, account, and # access token to get back a set of temporary credentials. # https://docs.aws.amazon.com/singlesignon/latest/PortalAPIReference/API_GetRoleCredentials.html # https://docs.aws.amazon.com/sdk-for-ruby/v3/api/Aws/SSO/Client.html#get_role_credentials-instance_method sso_region = aws_config_file_section["sso_session"] ? aws_config_file["sso-session #{aws_config_file_section["sso_session"]}"]["sso_region"] : aws_config_file_section["sso_region"] opts = (profile_name) sso = Aws::SSO::Client.new(region: sso_region) credentials = sso.get_role_credentials(opts) # Cache the credentials. The structure of this file doesn't exactly # match what native libraries (boto, etc) use. Instead, it matches the # default output of assume_role. It could be anything, it just needs # to be consistent across profile types, and match what # load_and_verify_cached_credentials expects. File.write(cache_key_path(opts), JSON.dump({"credentials" => { "access_key_id" => credentials.role_credentials.access_key_id, "secret_access_key" => credentials.role_credentials.secret_access_key, "session_token" => credentials.role_credentials.session_token }})) # Return the temporary IAM credentials Aws::Credentials.new(credentials.role_credentials.access_key_id, credentials.role_credentials.secret_access_key, credentials.role_credentials.session_token) elsif aws_config_file_section["mfa_serial"] # For profiles using an API key with an MFA token, get the serial # number of the MFA device associated with the profile. mfa_serial = aws_config_file_section["mfa_serial"] # Prompt the user for the current TOTP code associated with the MFA # device. mfa_code = $stdin.getpass("Enter MFA code for #{mfa_serial}: ") # Get a set of credentials for the role configured in the profile using # the TOTP code. I don't remember why, but I don't think these # credentials should be used for anything other than making another # call to assume_role. Don't cache or return these credentials. # Note: This is marked as a private API credentials = Aws.shared_config.assume_role_credentials_from_config(profile: profile_name, token_code: mfa_code.chomp) sts = Aws::STS::Client.new(region: "us-east-1", credentials: credentials) # Make a call to get_caller_identity to ensure that the first set of # credentials are valid? _id = sts.get_caller_identity # Make a regular assume_role call to get standard temporary IAM # credentials. opts = cacheable_role = sts.assume_role(opts) File.write(cache_key_path(opts), JSON.dump(cacheable_role.to_h)) # Return the temporary IAM credentials Aws::Credentials.new(cacheable_role["credentials"]["access_key_id"], cacheable_role["credentials"]["secret_access_key"], cacheable_role["credentials"]["session_token"]) end rescue Aws::SSO::Errors::UnauthorizedException raise "The SSO access token for this profile is invalid. Run 'aws sso login --profile #{profile_name}' to fetch a valid token." end |
.load_and_verify_cached_credentials(profile_name) ⇒ Object
For the selected profile, look for a set of cached temporary IAM credentials. These are vanilla IAM credentials that look the same regardless of what type of profile is selected (SSO, MFA, etc).
If no cached credential exist for the profile, or if the credentials are invalid (i.e., can’t successfully call get_caller_identity), a new set of credentials will be fetched and cached.
213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 |
# File 'lib/prx-ruby-aws-creds.rb', line 213 def load_and_verify_cached_credentials(profile_name) # Look up the cache file based on the options for the seleted profile. = (profile_name) || (profile_name) cached_role_json = File.read(cache_key_path()) cached_role = JSON.parse(cached_role_json) credentials = Aws::Credentials.new(cached_role["credentials"]["access_key_id"], cached_role["credentials"]["secret_access_key"], cached_role["credentials"]["session_token"]) # Verify that the credentials still work; this will raise an error if they're # bad, which we can catch sts = Aws::STS::Client.new(region: "us-east-1", credentials: credentials) sts.get_caller_identity credentials rescue Aws::STS::Errors::ExpiredToken get_and_cache_credentials(profile_name) rescue Aws::STS::Errors::InvalidClientTokenId get_and_cache_credentials(profile_name) rescue Errno::ENOENT get_and_cache_credentials(profile_name) end |
.sso_get_cached_access_token(start_url) ⇒ Object
For a given SSO start URL, return a valid access token from the cache. If no valid token is found, returns nil. An access token will only be considered valid if it has not expired.
57 58 59 60 61 62 63 64 65 66 67 68 69 70 |
# File 'lib/prx-ruby-aws-creds.rb', line 57 def sso_get_cached_access_token(start_url) Dir["#{Dir.home}/.aws/sso/cache/*.json"].each do |path| data = JSON.parse(File.read(path)) if data["startUrl"] && data["startUrl"] == start_url expiration = Time.parse(data["expiresAt"]) if expiration > Time.now return data["accessToken"] end end end nil end |
.sso_get_role_options(profile_name) ⇒ Object
Returns the options passed to SSO#get_role_credentials. This is used when the profile uses an SSO, rather than a key/secret. If the selected profile is not configured for SSO, returns nil.
‘role_name` and `account_id` are values found in the config file for the given profile. `access_token` is a SSO token found in the SSO cache for the SSO start URL associated with the given profile.
79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 |
# File 'lib/prx-ruby-aws-creds.rb', line 79 def (profile_name) aws_config_file = IniFile.load(AWS_CONFIG_FILE) aws_config_file_section = aws_config_file["profile #{profile_name}"] sso_start_url = aws_config_file_section["sso_session"] ? aws_config_file["sso-session #{aws_config_file_section["sso_session"]}"]["sso_start_url"] : aws_config_file_section["sso_start_url"] # The selected profile does not use SSO return if !sso_start_url # Get the SSO start URL for the selected profile profile_start_url = sso_start_url sso_access_token = sso_get_cached_access_token(profile_start_url) # If a valid token wasn't found in the cache, prompt the user to fetch a # new one. if !sso_access_token puts puts "No #{"access token".yellow} was found for this SSO start URL associated with this profile (#{profile_start_url.blue})." puts "Press #{"RETURN".green} to request a new token. This will open a web browser." puts "You can also do this manually with: 'aws sso login --profile #{profile_name}'".gray puts inp = $stdin.gets.chomp `aws sso login --profile #{profile_name}` if inp.empty? sso_access_token = sso_get_cached_access_token(profile_start_url) puts "This #{"access token".yellow} is valid for all SSO profiles using #{profile_start_url.blue} as their start URL." puts end { role_name: aws_config_file_section["sso_role_name"], account_id: aws_config_file_section["sso_account_id"].to_s, access_token: sso_access_token } end |