Source code for runway.blueprints.staticsite.auth_at_edge

"""Blueprint for the Authorization@Edge implementation of a Static Site.

Described in detail in this blogpost:
https://aws.amazon.com/blogs/networking-and-content-delivery/authorizationedge-how-to-use-lambdaedge-and-json-web-tokens-to-enhance-web-application-security/

"""
import logging
from typing import Any, Dict, List, Union  # pylint: disable=unused-import

import awacs.logs
import awacs.s3
from awacs.aws import Allow, Principal, Statement
from troposphere import (  # noqa pylint: disable=unused-import
    AccountId,
    Join,
    NoValue,
    Output,
    StackName,
    awslambda,
    cloudfront,
    iam,
    s3,
)

from runway.cfngin.context import Context  # pylint: disable=unused-import

from .staticsite import StaticSite

LOGGER = logging.getLogger("runway")


[docs]class AuthAtEdge(StaticSite): """Auth@Edge Blueprint.""" IAM_ARN_PREFIX = "arn:aws:iam::aws:policy/service-role/" AUTH_VARIABLES = { "OAuthScopes": {"type": list, "default": [], "description": "OAuth2 Scopes"}, "PriceClass": { "type": str, "default": "PriceClass_100", # US/Europe "description": "CF price class for the distribution.", }, "RedirectPathSignIn": { "type": str, "default": "/parseauth", "description": "Auth@Edge: The URL that should " "handle the redirect from Cognito " "after sign-in.", }, "RedirectPathAuthRefresh": { "type": str, "default": "/refreshauth", "description": "The URL path that should " "handle the JWT refresh request.", }, "NonSPAMode": { "type": bool, "default": False, "description": "Whether Auth@Edge should omit SPA specific settings", }, "SignOutUrl": { "type": str, "default": "/signout", "description": "The URL path that you can visit to sign-out.", }, } VARIABLES = {} # type: Dict[str, Dict[str, Union[str, Any]]] def __init__( self, name, # type: str context, # type: Context mappings=None, # type: Union[None, Dict] description=None, # type: Union[None, str] ): # type(...) -> Cleanup """Initialize the Blueprint. Args: name (str): The name of the stack. context (Context): The CFNgin Context object. mappings (Union(None, Dict)): Blueprint mappings. description (Union(None, str)): The description of the stack. """ super(AuthAtEdge, self).__init__(name, context, mappings, description) self.VARIABLES.update(StaticSite.VARIABLES) self.VARIABLES.update(self.AUTH_VARIABLES)
[docs] def create_template(self): # type: () -> None """Create the Blueprinted template for Auth@Edge.""" self.template.set_version("2010-09-09") self.template.set_description( "Authorization@Edge Static Website - Bucket, Lambdas, and Distribution" ) # Resources bucket = self.add_bucket() # self.add_bucket_policy(bucket) oai = self.add_origin_access_identity() bucket_policy = self.add_cloudfront_bucket_policy(bucket, oai) # TODO Make this available in Auth@Edge lambda_function_associations = [] # type: List[str] if self.directory_index_specified: index_rewrite = self._get_index_rewrite_role_function_and_version() lambda_function_associations = self.get_directory_index_lambda_association( lambda_function_associations, index_rewrite["version"] ) # Auth@Edge Lambdas check_auth_name = "CheckAuth" check_auth_lambda = self.get_auth_at_edge_lambda_and_ver( check_auth_name, "Check Authorization information for request", "check_auth", self.add_lambda_execution_role( "CheckAuthLambdaExecutionRole", check_auth_name ), ) http_headers_name = "HttpHeaders" http_headers_lambda = self.get_auth_at_edge_lambda_and_ver( http_headers_name, "Additional Headers added to every response", "http_headers", self.add_lambda_execution_role( "HttpHeadersLambdaExecutionRole", http_headers_name ), ) parse_auth_name = "ParseAuth" parse_auth_lambda = self.get_auth_at_edge_lambda_and_ver( parse_auth_name, "Parse the Authorization Headers/Cookies for the request", "parse_auth", self.add_lambda_execution_role( "ParseAuthLambdaExecutionRole", parse_auth_name ), ) refresh_auth_name = "RefreshAuth" refresh_auth_lambda = self.get_auth_at_edge_lambda_and_ver( refresh_auth_name, "Refresh the Authorization information when expired", "refresh_auth", self.add_lambda_execution_role( "RefreshAuthLambdaExecutionRole", refresh_auth_name ), ) sign_out_name = "SignOut" sign_out_lambda = self.get_auth_at_edge_lambda_and_ver( sign_out_name, "Sign the User out of the application", "sign_out", self.add_lambda_execution_role("SignOutLambdaExecutionRole", sign_out_name), ) # CloudFront Distribution distribution_options = self.get_distribution_options( bucket, oai, lambda_function_associations, check_auth_lambda["version"], http_headers_lambda["version"], parse_auth_lambda["version"], refresh_auth_lambda["version"], sign_out_lambda["version"], ) self.add_cloudfront_distribution(bucket_policy, distribution_options)
[docs] def get_auth_at_edge_lambda_and_ver( self, title, # type: str description, # type: str handle, # type: str role, # type: iam.Role ): # type: (...) -> Dict[str, Any] """Create a lambda function and its version. Args: title (str): The name of the function in PascalCase. description (str): Description to be displayed in the lambda panel. handle (str): The underscore separated representation of the name of the lambda. This handle is used to determine the handler for the lambda as well as identify the correct Code hook_data information. role (IAM.Role): The Lambda Execution Role. """ function = self.get_auth_at_edge_lambda(title, description, handle, role) return {"function": function, "version": self.add_version(title, function)}
[docs] def get_auth_at_edge_lambda( self, title, # type: str description, # type: str handler, # type: str role, # type: iam.Role ): # type: (...) -> awslambda.Function """Create an Auth@Edge lambda resource. Args: title (str): The name of the function in PascalCase. description (str): Description to be displayed in the lambda panel. handler (str): The underscore separated representation of the name of the lambda. This handle is used to determine the handler for the lambda as well as identify the correct Code hook_data information. role (IAM.Role): The Lambda Execution Role. """ lamb = self.template.add_resource( awslambda.Function( title, DeletionPolicy="Retain", Code=self.context.hook_data["aae_lambda_config"][handler], Description=description, Handler="__init__.handler", Role=role.get_att("Arn"), Runtime="python3.7", ) ) self.template.add_output( Output( "Lambda%sArn" % title, Description="Arn For the %s Lambda Function" % title, Value=lamb.get_att("Arn"), ) ) return lamb
[docs] def add_version( self, title, # type: str lambda_function, # type: awslambda.Function ): # type: (...) -> awslambda.Version """Create a version association with a Lambda@Edge function. In order to ensure different versions of the function are appropriately uploaded a hash based on the code of the lambda is appended to the name. As the code changes so will this hash value. Args: title (str): The name of the function in PascalCase. lambda_function (awslambda.Function): The Lambda function. """ s3_key = lambda_function.properties["Code"].to_dict()["S3Key"] code_hash = s3_key.split(".")[0].split("-")[-1] return self.template.add_resource( awslambda.Version( title + "Ver" + code_hash, FunctionName=lambda_function.ref() ) )
[docs] def get_distribution_options( self, bucket, # type: s3.Bucket oai, # type: cloudfront.CloudFrontOriginAccessIdentity lambda_funcs, # type: List[cloudfront.LambdaFunctionAssociation] check_auth_lambda_version, # type: awslambda.Version http_headers_lambda_version, # type: awslambda.Version parse_auth_lambda_version, # type: awslambda.Version refresh_auth_lambda_version, # type: awslambda.Version sign_out_lambda_version, # type: awslambda.Version ): # type: (...) -> Dict[str, Any] """Retrieve the options for our CloudFront distribution. Keyword Args: bucket: The bucket resource. oai: The origin access identity resource. lambda_funcs: List of Lambda Function associations. check_auth_lambda_version: Lambda Function Version to use. http_headers_lambda_version: Lambda Function Version to use. parse_auth_lambda_version: Lambda Function Version to use. refresh_auth_lambda_version: Lambda Function Version to use. sign_out_lambda_version: Lambda Function Version to use. Return: The CloudFront Distribution Options. """ variables = self.get_variables() default_cache_behavior_lambdas = lambda_funcs default_cache_behavior_lambdas.append( cloudfront.LambdaFunctionAssociation( EventType="viewer-request", LambdaFunctionARN=check_auth_lambda_version.ref(), ) ) default_cache_behavior_lambdas.append( cloudfront.LambdaFunctionAssociation( EventType="origin-response", LambdaFunctionARN=http_headers_lambda_version.ref(), ) ) return { "Aliases": self.add_aliases(), "Origins": [ cloudfront.Origin( DomainName=Join(".", [bucket.ref(), "s3.amazonaws.com"]), S3OriginConfig=cloudfront.S3OriginConfig( OriginAccessIdentity=Join( "", ["origin-access-identity/cloudfront/", oai.ref()] ) ), Id="protected-bucket", ) ], "CacheBehaviors": [ cloudfront.CacheBehavior( PathPattern=variables["RedirectPathSignIn"], Compress=True, ForwardedValues=cloudfront.ForwardedValues(QueryString=True), LambdaFunctionAssociations=[ cloudfront.LambdaFunctionAssociation( EventType="viewer-request", LambdaFunctionARN=parse_auth_lambda_version.ref(), ) ], TargetOriginId="protected-bucket", ViewerProtocolPolicy="redirect-to-https", ), cloudfront.CacheBehavior( PathPattern=variables["RedirectPathAuthRefresh"], Compress=True, ForwardedValues=cloudfront.ForwardedValues(QueryString=True), LambdaFunctionAssociations=[ cloudfront.LambdaFunctionAssociation( EventType="viewer-request", LambdaFunctionARN=refresh_auth_lambda_version.ref(), ) ], TargetOriginId="protected-bucket", ViewerProtocolPolicy="redirect-to-https", ), cloudfront.CacheBehavior( PathPattern=variables["SignOutUrl"], Compress=True, ForwardedValues=cloudfront.ForwardedValues(QueryString=True), LambdaFunctionAssociations=[ cloudfront.LambdaFunctionAssociation( EventType="viewer-request", LambdaFunctionARN=sign_out_lambda_version.ref(), ) ], TargetOriginId="protected-bucket", ViewerProtocolPolicy="redirect-to-https", ), ], "DefaultCacheBehavior": cloudfront.DefaultCacheBehavior( AllowedMethods=["GET", "HEAD"], Compress=True, DefaultTTL="86400", ForwardedValues=cloudfront.ForwardedValues(QueryString=True,), LambdaFunctionAssociations=default_cache_behavior_lambdas, TargetOriginId="protected-bucket", ViewerProtocolPolicy="redirect-to-https", ), "DefaultRootObject": "index.html", "Logging": self.add_logging_bucket(), "PriceClass": variables["PriceClass"], "Enabled": True, "WebACLId": self.add_web_acl(), "CustomErrorResponses": self._get_error_responses(), "ViewerCertificate": self.add_acm_cert(), }
def _get_error_responses(self): """Return error response based on site stack variables. When custom_error_responses are defined return those, if running in NonSPAMode return nothing, or return the standard error responses for a SPA. """ variables = self.get_variables() if variables["custom_error_responses"]: return [ cloudfront.CustomErrorResponse( ErrorCode=response["ErrorCode"], ResponseCode=response["ResponseCode"], ResponsePagePath=response["ResponsePagePath"], ) for response in variables["custom_error_responses"] ] if variables["NonSPAMode"]: return [] return [ cloudfront.CustomErrorResponse( ErrorCode=404, ResponseCode=200, ResponsePagePath="/index.html" ) ] def _get_cloudfront_bucket_policy_statements(self, bucket, oai): return [ Statement( Action=[awacs.s3.GetObject], Effect=Allow, Principal=Principal("CanonicalUser", oai.get_att("S3CanonicalUserId")), Resource=[Join("", [bucket.get_att("Arn"), "/*"])], ), Statement( Action=[awacs.s3.ListBucket], Effect=Allow, Principal=Principal("CanonicalUser", oai.get_att("S3CanonicalUserId")), Resource=[bucket.get_att("Arn")], ), ]