Source code for runway.cfngin.hooks.staticsite.auth_at_edge.lambda_config

"""CFNgin prehook responsible for creation of Lambda@Edge functions."""
from __future__ import annotations

import logging
import os
import re
import secrets
import shutil
import tempfile
from tempfile import mkstemp
from typing import TYPE_CHECKING, Any, Dict, List, Optional

from ... import aws_lambda
from ...base import HookArgsBaseModel

if TYPE_CHECKING:
    from .....context import CfnginContext
    from ....providers.aws.default import Provider

# The functions associated with Auth@Edge
FUNCTIONS = ["check_auth", "refresh_auth", "parse_auth", "sign_out", "http_headers"]


LOGGER = logging.getLogger(__name__)


[docs]class HookArgs(HookArgsBaseModel): """Hook arguments.""" bucket: str """S3 bucket name.""" client_id: str """The ID of the Cognito User Pool Client.""" cookie_settings: Dict[str, Any] """The settings for our customized cookies.""" http_headers: Dict[str, Any] """The additional headers added to our requests.""" nonce_signing_secret_param_name: str """SSM param name to store nonce signing secret.""" oauth_scopes: List[str] """The validation scopes for our OAuth requests.""" redirect_path_refresh: str """The URL path for authorization refresh redirect (Correlates to the refresh auth lambda).""" redirect_path_sign_in: str """The URL path to be redirected to after sign in (Correlates to the parse auth lambda).""" redirect_path_sign_out: str """The URL path to be redirected to after sign out (Correlates to the root to be asked to resigning). """ required_group: Optional[str] = None """Optional User Pool group to which access should be restricted."""
# pylint: disable=too-many-locals
[docs]def write( context: CfnginContext, provider: Provider, *__args: Any, **kwargs: Any ) -> Dict[str, Any]: """Writes/Uploads the configured lambdas for Auth@Edge. Lambda@Edge does not have the ability to allow Environment variables at the time of this writing. In order to configure our lambdas with dynamic variables we first will go through and update a "shared" template with all of the configuration elements and add that to a temporary folder along with each of the individual Lambda@Edge functions. This temporary folder is then used with the CFNgin awsLambda hook to build the functions. """ cognito_domain = context.hook_data["aae_domain_updater"].get("domain") args = HookArgs.parse_obj(kwargs) config = { "client_id": args.client_id, "cognito_auth_domain": cognito_domain, "cookie_settings": args.cookie_settings, "http_headers": args.http_headers, "oauth_scopes": args.oauth_scopes, "redirect_path_auth_refresh": args.redirect_path_refresh, "redirect_path_sign_in": args.redirect_path_sign_in, "redirect_path_sign_out": args.redirect_path_sign_out, "required_group": args.required_group, "user_pool_id": context.hook_data["aae_user_pool_id_retriever"]["id"], "nonce_signing_secret": get_nonce_signing_secret( args.nonce_signing_secret_param_name, context ), } # Shared file that contains the method called for configuration data path = os.path.join(os.path.dirname(__file__), "templates", "shared.py") context_dict: Dict[str, Any] = {} with open(path, encoding="utf-8") as file_: # Dynamically replace our configuration values # in the shared.py template file with actual # calculated values shared = re.sub( r"{.+?(})$", str(config), file_.read(), count=1, flags=re.DOTALL | re.MULTILINE, ) filedir, temppath = mkstemp() # Save the file to a temp path with open(temppath, "w", encoding="utf-8") as tmp: tmp.write(shared) config = temppath os.close(filedir) # Get all of the different Auth@Edge functions for handler in FUNCTIONS: # Create a temporary folder dirpath = tempfile.mkdtemp() # Copy the template code for the specific Lambda function # to the temporary folder shutil.copytree( os.path.join(os.path.dirname(__file__), "templates", handler), dirpath, dirs_exist_ok=True, ) # Save our dynamic configuration shared file to the # temporary folder with open(config, encoding="utf-8") as shared: raw = shared.read() filename = "shared.py" with open(os.path.join(dirpath, filename), "wb") as newfile: newfile.write(raw.encode()) # Copy the shared jose-dependent util module to the temporary folder shutil.copyfile( os.path.join(os.path.dirname(__file__), "templates", "shared_jose.py"), os.path.join(dirpath, "shared_jose.py"), ) # Upload our temporary folder to our S3 bucket for # Lambda use lamb = aws_lambda.upload_lambda_functions( context, provider, bucket=args.bucket, functions={ handler: { "path": dirpath, "python_dontwritebytecode": True, "python_exclude_bin_dir": True, "python_exclude_setuptools_dirs": True, } }, ) # Add the lambda code reference to our context_dict context_dict.update(lamb) return context_dict
[docs]def get_nonce_signing_secret(param_name: str, context: CfnginContext) -> str: """Retrieve signing secret, generating & storing it first if not present.""" session = context.get_session() ssm_client = session.client("ssm") try: response = ssm_client.get_parameter(Name=param_name, WithDecryption=True) return response["Parameter"]["Value"] except ssm_client.exceptions.ParameterNotFound: secret = random_key(16) ssm_client.put_parameter( Description="Auth@Edge nonce signing secret", Name=param_name, Type="String", Value=secret, ) return secret
[docs]def random_key(length: int = 16) -> str: """Generate a random key of specified length from the allowed secret characters. Args: length: The length of the random key. """ secret_allowed_chars = ( "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789-._~" ) return "".join(secrets.choice(secret_allowed_chars) for _ in range(length))