Source code for runway.aws_sso_botocore.credentials

"""Botocore with support for AWS SSO credential assets."""
# Copyright 2012-2014 Amazon.com, Inc. or its affiliates. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"). You
# may not use this file except in compliance with the License. A copy of
# the License is located at
#
# http://aws.amazon.com/apache2.0/
#
# or in the "license" file accompanying this file. This file is
# distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF
# ANY KIND, either express or implied. See the License for the specific
# language governing permissions and limitations under the License.
# pylint: disable=too-few-public-methods
import datetime
import json
import logging
import os
from hashlib import sha1

from botocore import UNSIGNED
from botocore.config import Config
from botocore.credentials import (
    AssumeRoleProvider,
    BotoProvider,
    CachedCredentialFetcher,
    CanonicalNameCredentialSourcer,
    ContainerProvider,
    CredentialProvider,
    CredentialResolver,
    DeferredRefreshableCredentials,
    EnvProvider,
    InstanceMetadataFetcher,
    InstanceMetadataProvider,
    JSONFileCache,
    OriginalEC2Provider,
)
from botocore.credentials import (
    ProfileProviderBuilder as BotocoreProfileProviderBuilder,
)
from botocore.credentials import _get_client_creator, _serialize_if_needed
from botocore.exceptions import InvalidConfigError
from dateutil.tz import tzutc

from .exceptions import UnauthorizedSSOTokenError
from .util import SSOTokenLoader

LOGGER = logging.getLogger(__name__)


[docs]def create_credential_resolver(session, cache=None, region_name=None): """Create a default credential resolver. This creates a pre-configured credential resolver that includes the default lookup chain for credentials. """ profile_name = session.get_config_variable('profile') or 'default' metadata_timeout = session.get_config_variable('metadata_service_timeout') num_attempts = session.get_config_variable('metadata_service_num_attempts') disable_env_vars = session.instance_variables().get('profile') is not None if cache is None: cache = {} env_provider = EnvProvider() container_provider = ContainerProvider() instance_metadata_provider = InstanceMetadataProvider( iam_role_fetcher=InstanceMetadataFetcher( timeout=metadata_timeout, num_attempts=num_attempts, user_agent=session.user_agent()) ) profile_provider_builder = ProfileProviderBuilder( session, cache=cache, region_name=region_name) assume_role_provider = AssumeRoleProvider( load_config=lambda: session.full_config, client_creator=_get_client_creator(session, region_name), cache=cache, profile_name=profile_name, credential_sourcer=CanonicalNameCredentialSourcer([ env_provider, container_provider, instance_metadata_provider ]), profile_provider_builder=profile_provider_builder, ) pre_profile = [ env_provider, assume_role_provider, ] profile_providers = profile_provider_builder.providers( profile_name=profile_name, disable_env_vars=disable_env_vars, ) post_profile = [ OriginalEC2Provider(), BotoProvider(), container_provider, instance_metadata_provider, ] providers = pre_profile + profile_providers + post_profile if disable_env_vars: # An explicitly provided profile will negate an EnvProvider. # We will defer to providers that understand the "profile" # concept to retrieve credentials. # The one edge case if is all three values are provided via # env vars: # export AWS_ACCESS_KEY_ID=foo # export AWS_SECRET_ACCESS_KEY=bar # export AWS_PROFILE=baz # Then, just like our client() calls, the explicit credentials # will take precedence. # # This precedence is enforced by leaving the EnvProvider in the chain. # This means that the only way a "profile" would win is if the # EnvProvider does not return credentials, which is what we want # in this scenario. providers.remove(env_provider) LOGGER.debug('Skipping environment variable credential check' ' because profile name was explicitly set.') return CredentialResolver(providers=providers)
[docs]class ProfileProviderBuilder(BotocoreProfileProviderBuilder): """Extends the botocore profile provider builder to support AWS SSO.""" def __init__(self, session, cache=None, region_name=None, sso_token_cache=None): """Instantiate class.""" super(ProfileProviderBuilder, self).__init__(session, cache, region_name) self._sso_token_cache = sso_token_cache
[docs] def providers(self, profile_name, disable_env_vars=False): """Return list of providers.""" return [ self._create_web_identity_provider( profile_name, disable_env_vars, ), self._create_sso_provider(profile_name), self._create_shared_credential_provider(profile_name), self._create_process_provider(profile_name), self._create_config_provider(profile_name), ]
def _create_sso_provider(self, profile_name): """AWS SSO credential provider.""" return SSOProvider( load_config=lambda: self._session.full_config, client_creator=self._session.create_client, profile_name=profile_name, cache=self._cache, token_cache=self._sso_token_cache, )
[docs]class SSOCredentialFetcher(CachedCredentialFetcher): """AWS SSO credential fetcher.""" def __init__(self, start_url, sso_region, role_name, account_id, client_creator, token_loader=None, cache=None, expiry_window_seconds=None): """Instantiate class.""" self._client_creator = client_creator self._sso_region = sso_region self._role_name = role_name self._account_id = account_id self._start_url = start_url self._token_loader = token_loader super(SSOCredentialFetcher, self).__init__( cache, expiry_window_seconds ) def _create_cache_key(self): """Create a predictable cache key for the current configuration. The cache key is intended to be compatible with file names. """ args = { 'startUrl': self._start_url, 'roleName': self._role_name, 'accountId': self._account_id, } # NOTE: It would be good to hoist this cache key construction logic # into the CachedCredentialFetcher class as we should be consistent. # Unfortunately, the current assume role fetchers that sub class don't # pass separators resulting in non-minified JSON. In the long term, # all fetchers should use the below caching scheme. args = json.dumps(args, sort_keys=True, separators=(',', ':')) argument_hash = sha1(args.encode('utf-8')).hexdigest() return self._make_file_safe(argument_hash) def _parse_timestamp(self, timestamp_ms): # pylint: disable=no-self-use """Parse timestamp.""" # fromtimestamp expects seconds so: milliseconds / 1000 = seconds timestamp_seconds = timestamp_ms / 1000.0 timestamp = datetime.datetime.fromtimestamp(timestamp_seconds, tzutc()) return _serialize_if_needed(timestamp) def _get_credentials(self): """Get credentials by calling SSO get role credentials.""" config = Config( signature_version=UNSIGNED, region_name=self._sso_region, ) client = self._client_creator('sso', config=config) kwargs = { 'roleName': self._role_name, 'accountId': self._account_id, 'accessToken': self._token_loader(self._start_url), } try: response = client.get_role_credentials(**kwargs) except client.exceptions.UnauthorizedException: raise UnauthorizedSSOTokenError() credentials = response['roleCredentials'] credentials = { 'ProviderType': 'sso', 'Credentials': { 'AccessKeyId': credentials['accessKeyId'], 'SecretAccessKey': credentials['secretAccessKey'], 'SessionToken': credentials['sessionToken'], 'Expiration': self._parse_timestamp(credentials['expiration']), } } return credentials
[docs]class SSOProvider(CredentialProvider): """AWS SSO credential provider.""" METHOD = 'sso' _SSO_TOKEN_CACHE_DIR = os.path.expanduser( os.path.join('~', '.aws', 'sso', 'cache') ) _SSO_CONFIG_VARS = [ 'sso_start_url', 'sso_region', 'sso_role_name', 'sso_account_id', ] # pylint: disable=super-init-not-called def __init__(self, load_config, client_creator, profile_name, cache=None, token_cache=None): """Instantiate class.""" if token_cache is None: token_cache = JSONFileCache(self._SSO_TOKEN_CACHE_DIR) self._token_cache = token_cache if cache is None: cache = {} self.cache = cache self._load_config = load_config self._client_creator = client_creator self._profile_name = profile_name def _load_sso_config(self): """Load sso config.""" loaded_config = self._load_config() profiles = loaded_config.get('profiles', {}) profile_name = self._profile_name profile_config = profiles.get(self._profile_name, {}) if all(c not in profile_config for c in self._SSO_CONFIG_VARS): return None config = {} missing_config_vars = [] for config_var in self._SSO_CONFIG_VARS: if config_var in profile_config: config[config_var] = profile_config[config_var] else: missing_config_vars.append(config_var) if missing_config_vars: missing = ', '.join(missing_config_vars) raise InvalidConfigError( error_msg=( 'The profile "%s" is configured to use SSO but is missing ' 'required configuration: %s' % (profile_name, missing) ) ) return config
[docs] def load(self): """Load AWS SSO credentials.""" sso_config = self._load_sso_config() if not sso_config: return None sso_fetcher = SSOCredentialFetcher( sso_config['sso_start_url'], sso_config['sso_region'], sso_config['sso_role_name'], sso_config['sso_account_id'], self._client_creator, token_loader=SSOTokenLoader(cache=self._token_cache), cache=self.cache, ) return DeferredRefreshableCredentials( method=self.METHOD, refresh_using=sso_fetcher.fetch_credentials, )