Source code for runway.module.staticsite

"""Static website module."""
import logging
import os
import sys
import tempfile
from typing import Any, Dict  # pylint: disable=unused-import

import yaml

from .._logging import PrefixAdaptor
from ..util import YamlDumper
from . import RunwayModule
from .cloudformation import CloudFormation

LOGGER = logging.getLogger(__name__)


[docs]def add_url_scheme(url): """Add the scheme to an existing url. Args: url (str): The current url. """ if url.startswith("https://") or url.startswith("http://"): return url newurl = "https://%s" % url return newurl
[docs]class StaticSite(RunwayModule): """Static website Runway Module.""" def __init__(self, context, path, options=None): """Initialize.""" super(StaticSite, self).__init__(context, path, options) self.user_options = self.options.get("options", {}) self.parameters = self.options.get("parameters") # type: Dict[str, Any] self.region = self.context.env.aws_region # logger needs to be created here to use the correct logger self.logger = PrefixAdaptor(self.name, LOGGER) self._ensure_valid_environment_config() self._ensure_cloudfront_with_auth_at_edge() self._ensure_correct_region_with_auth_at_edge()
[docs] def plan(self): """Create website CFN module and run stacker diff.""" if self.parameters: self._setup_website_module(command="plan") else: self.logger.info("skipped; environment required but not defined")
[docs] def deploy(self): """Create website CFN module and run stacker build.""" if self.parameters: if self.parameters.get("staticsite_cf_disable", False) is False: self.logger.warning( "initial creation of & updates to distributions can take " "up to an hour to complete" ) # Auth@Edge warning about subsequent deploys if ( self.parameters.get("staticsite_auth_at_edge", False) and not self.parameters.get("staticsite_aliases", False) and self.context.is_interactive ): self.logger.warning( "A hook that is part of the dependencies stack of " "the Auth@Edge static site deployment is designed " "to verify that the correct Callback URLs are " "being used when a User Pool Client already " "exists for the application. This ensures that " "there is no interruption of service while the " "deployment reaches the stage where the Callback " "URLs are updated to that of the Distribution. " "Because of this you may receive a change set " "prompt on subsequent deploys." ) self._setup_website_module(command="deploy") else: self.logger.info("skipped; environment required but not defined")
[docs] def destroy(self): """Create website CFN module and run stacker destroy.""" if self.parameters: self._setup_website_module(command="destroy") else: self.logger.info("skipped; environment required but not defined")
def _setup_website_module( self, # type: StaticSite command, # type: str ): # type(...) -> return None """Create CFNgin configuration for website module.""" self.logger.info("generating CFNgin config...") module_dir = self._create_module_directory() self._create_dependencies_yaml(module_dir) self._create_staticsite_yaml(module_dir) # Earlier Runway versions included a CFN stack with a state machine # that attempted to automatically clean up the orphaned Lambda@Edge # functions. This was found to be unreliable and has been removed. # For a period of time (e.g. until the next major release) leaving this # in to automatically delete the stack. Not a major priority to have # Runway delete the old `-cleanup` stack, as the resources in it don't # have any costs when unused. if command == "destroy" and ( self.parameters.get("staticsite_auth_at_edge") or self.parameters.get("staticsite_rewrite_index_index") ): self._create_cleanup_yaml(module_dir) cfn = CloudFormation( self.context, module_dir, {i: self.options[i] for i in self.options if i != "class_path"}, ) self.logger.info("%s (in progress)", command) getattr(cfn, command)() self.logger.info("%s (complete)", command) def _create_module_directory(self): module_dir = tempfile.mkdtemp() self.logger.debug("using temporary directory: %s", module_dir) return module_dir def _create_dependencies_yaml(self, module_dir): pre_build = [] pre_destroy = [ { "path": "runway.hooks.cleanup_s3.purge_bucket", "required": True, "args": {"bucket_rxref_lookup": "%s-dependencies::%s" % (self.name, i)}, } for i in ["AWSLogBucketName", "ArtifactsBucketName"] ] if self.parameters.get("staticsite_auth_at_edge", False): if not self.parameters.get("staticsite_aliases"): # Retrieve the appropriate callback urls from the User Pool Client pre_build.append( { "path": "runway.hooks.staticsite.auth_at_edge.callback_url_retriever.get", "required": True, "data_key": "aae_callback_url_retriever", "args": { "user_pool_arn": self.parameters.get( "staticsite_user_pool_arn", "" ), "aliases": self.parameters.get("staticsite_aliases", ""), "additional_callback_domains": self.parameters.get( "staticsite_additional_callback_domains", "" ), "stack_name": "${namespace}-%s-dependencies" % self.name, }, } ) if self.parameters.get("staticsite_create_user_pool"): # Retrieve the user pool id pre_destroy.append( { "path": "runway.hooks.staticsite.auth_at_edge.user_pool_id_retriever.get", "required": True, "data_key": "aae_user_pool_id_retriever", "args": self._get_user_pool_id_retriever_variables(), } ) # Delete the domain prior to trying to delete the # User Pool Client that was created pre_destroy.append( { "path": "runway.hooks.staticsite.auth_at_edge.domain_updater.delete", "required": True, "data_key": "aae_domain_updater", "args": self._get_domain_updater_variables(), } ) else: # Retrieve the user pool id pre_build.append( { "path": "runway.hooks.staticsite.auth_at_edge.user_pool_id_retriever.get", "required": True, "data_key": "aae_user_pool_id_retriever", "args": self._get_user_pool_id_retriever_variables(), } ) content = { "namespace": "${namespace}", "cfngin_bucket": "", "stacks": { "%s-dependencies" % self.name: { "class_path": "runway.blueprints.staticsite.dependencies.Dependencies", "variables": self._get_dependencies_variables(), } }, "pre_build": pre_build, "pre_destroy": pre_destroy, } with open( os.path.join(module_dir, "01-dependencies.yaml"), "w" ) as output_stream: yaml.dump(content, output_stream, default_flow_style=False) self.logger.debug( "created 01-dependencies.yaml:\n%s", yaml.dump(content, Dumper=YamlDumper) ) def _create_staticsite_yaml(self, module_dir): # Default parameter name matches build_staticsite hook hash_param = self.user_options.get("source_hashing", {}).get( "parameter", "${namespace}-%s-hash" % self.name ) nonce_secret_param = "${namespace}-%s-nonce-secret" % self.name build_staticsite_args = self.options.copy() or {} build_staticsite_args["artifact_bucket_rxref_lookup"] = ( "%s-dependencies::ArtifactsBucketName" % self.name ) build_staticsite_args["options"]["namespace"] = "${namespace}" build_staticsite_args["options"]["name"] = self.name build_staticsite_args["options"]["path"] = os.path.join( os.path.realpath(self.context.env_root), self.path ) site_stack_variables = self._get_site_stack_variables() class_path = "staticsite.StaticSite" pre_build = [ { "path": "runway.hooks.staticsite.build_staticsite.build", "required": True, "data_key": "staticsite", "args": build_staticsite_args, } ] post_build = [ { "path": "runway.hooks.staticsite.upload_staticsite.sync", "required": True, "args": { "bucket_output_lookup": "%s::BucketName" % self.name, "website_url": "%s::BucketWebsiteURL" % self.name, "extra_files": self.user_options.get("extra_files", []), "cf_disabled": site_stack_variables["DisableCloudFront"], "distributionid_output_lookup": "%s::CFDistributionId" % (self.name), "distributiondomain_output_lookup": "%s::CFDistributionDomainName" % self.name, }, } ] pre_destroy = [ { "path": "runway.hooks.cleanup_s3.purge_bucket", "required": True, "args": {"bucket_rxref_lookup": "%s::BucketName" % self.name}, } ] if self.parameters.get( "staticsite_rewrite_directory_index" ) or self.parameters.get("staticsite_auth_at_edge"): pre_destroy.append( { "path": "runway.hooks.staticsite.cleanup.warn", "required": False, "args": {"stack_relative_name": self.name}, } ) post_destroy = [ { "path": "runway.hooks.cleanup_ssm.delete_param", "args": {"parameter_name": i}, } for i in [hash_param, nonce_secret_param, "%sextra" % hash_param] ] if self.parameters.get("staticsite_auth_at_edge", False): class_path = "auth_at_edge.AuthAtEdge" pre_build.append( { "path": "runway.hooks.staticsite.auth_at_edge.user_pool_id_retriever.get", "required": True, "data_key": "aae_user_pool_id_retriever", "args": self._get_user_pool_id_retriever_variables(), } ) pre_build.append( { "path": "runway.hooks.staticsite.auth_at_edge.domain_updater.update", "required": True, "data_key": "aae_domain_updater", "args": self._get_domain_updater_variables(), } ) pre_build.append( { "path": "runway.hooks.staticsite.auth_at_edge.lambda_config.write", "required": True, "data_key": "aae_lambda_config", "args": self._get_lambda_config_variables( site_stack_variables, nonce_secret_param, self.parameters.get("staticsite_required_group"), ), } ) if not self.parameters.get("staticsite_aliases"): post_build.insert( 0, { "path": "runway.hooks.staticsite.auth_at_edge.client_updater.update", "required": True, "data_key": "client_updater", "args": self._get_client_updater_variables( self.name, site_stack_variables ), }, ) if self.parameters.get("staticsite_role_boundary_arn", False): site_stack_variables["RoleBoundaryArn"] = self.parameters[ "staticsite_role_boundary_arn" ] # If lambda_function_associations or custom_error_responses defined, # add to stack config for i in ["custom_error_responses", "lambda_function_associations"]: if self.parameters.get("staticsite_%s" % i): site_stack_variables[i] = self.parameters.pop("staticsite_%s" % i) content = { "namespace": "${namespace}", "cfngin_bucket": "", "pre_build": pre_build, "stacks": { self.name: { "class_path": "runway.blueprints.staticsite.%s" % class_path, "variables": site_stack_variables, } }, "post_build": post_build, "pre_destroy": pre_destroy, "post_destroy": post_destroy, } with open(os.path.join(module_dir, "02-staticsite.yaml"), "w") as output_stream: yaml.dump(content, output_stream, default_flow_style=False) self.logger.debug( "created 02-staticsite.yaml:\n%s", yaml.dump(content, Dumper=YamlDumper) ) def _create_cleanup_yaml(self, module_dir): content = { "namespace": "${namespace}", "cfngin_bucket": "", "stacks": { "%s-cleanup" % self.name: { "template_path": os.path.join( tempfile.gettempdir(), "thisfileisnotused.yaml" ), } }, } with open(os.path.join(module_dir, "03-cleanup.yaml"), "w") as output_stream: yaml.dump(content, output_stream, default_flow_style=False) self.logger.debug( "created 03-cleanup.yaml:\n%s", yaml.dump(content, Dumper=YamlDumper) ) def _get_site_stack_variables(self): site_stack_variables = { "Aliases": [], "DisableCloudFront": self.parameters.get("staticsite_cf_disable", False), "RewriteDirectoryIndex": self.parameters.get( "staticsite_rewrite_directory_index", "" ), "RedirectPathSignIn": "${default staticsite_redirect_path_sign_in::/parseauth}", "RedirectPathSignOut": "${default staticsite_redirect_path_sign_out::/}", "RedirectPathAuthRefresh": "${default staticsite_redirect_path_auth_refresh" "::/refreshauth}", "SignOutUrl": "${default staticsite_sign_out_url::/signout}", "WAFWebACL": self.parameters.get("staticsite_web_acl", ""), } if self.parameters.get("staticsite_aliases"): site_stack_variables["Aliases"] = self.parameters.get( "staticsite_aliases" ).split(",") if self.parameters.get("staticsite_acmcert_arn"): site_stack_variables["AcmCertificateArn"] = self.parameters[ "staticsite_acmcert_arn" ] if self.parameters.get("staticsite_acmcert_ssm_param"): self.logger.warning( "staticsite_acmcert_ssm_param option has been deprecated; " "use staticsite_acmcert_arn with an ssm lookup" ) site_stack_variables[ "AcmCertificateArn" ] = "${ssmstore ${staticsite_acmcert_ssm_param}}" if self.parameters.get("staticsite_enable_cf_logging", True): site_stack_variables["LogBucketName"] = ( "${rxref %s-dependencies::AWSLogBucketName}" % self.name ) if self.parameters.get("staticsite_auth_at_edge", False): self._ensure_auth_at_edge_requirements() site_stack_variables["UserPoolArn"] = self.parameters.get( "staticsite_user_pool_arn" ) site_stack_variables["NonSPAMode"] = self.parameters.get( "staticsite_non_spa", False ) site_stack_variables["HttpHeaders"] = self._get_http_headers() site_stack_variables["CookieSettings"] = self._get_cookie_settings() site_stack_variables["OAuthScopes"] = self._get_oauth_scopes() else: # If lambda_function_associations or custom_error_responses defined, # add to stack config. Only if not using Auth@Edge for i in ["custom_error_responses", "lambda_function_associations"]: if self.parameters.get("staticsite_%s" % i): site_stack_variables[i] = self.parameters.get("staticsite_%s" % i) self.parameters.pop("staticsite_%s" % i) return site_stack_variables def _get_cookie_settings(self): """Retrieve the cookie settings from the variables or return the default.""" if self.parameters.get("staticsite_cookie_settings"): return self.parameters.get("staticsite_cookie_settings") return { "idToken": "Path=/; Secure; SameSite=Lax", "accessToken": "Path=/; Secure; SameSite=Lax", "refreshToken": "Path=/; Secure; SameSite=Lax", "nonce": "Path=/; Secure; HttpOnly; Max-Age=1800; SameSite=Lax", } def _get_http_headers(self): """Retrieve the http headers from the variables or return the default.""" if self.parameters.get("staticsite_http_headers"): return self.parameters.get("staticsite_http_headers") return { "Content-Security-Policy": "default-src https: 'unsafe-eval' 'unsafe-inline'; " "font-src 'self' 'unsafe-inline' 'unsafe-eval' data: https:; " "object-src 'none'; " "connect-src 'self' https://*.amazonaws.com https://*.amazoncognito.com", "Strict-Transport-Security": "max-age=31536000; " "includeSubdomains; " "preload", "Referrer-Policy": "same-origin", "X-XSS-Protection": "1; mode=block", "X-Frame-Options": "DENY", "X-Content-Type-Options": "nosniff", } def _get_oauth_scopes(self): """Retrieve the oauth scopes from the variables or return the default.""" if self.parameters.get("staticsite_oauth_scopes"): return self.parameters.get("staticsite_oauth_scopes") return ["phone", "email", "profile", "openid", "aws.cognito.signin.user.admin"] def _get_supported_identity_providers(self): providers = self.parameters.get("staticsite_supported_identity_providers") if providers: return [provider.strip() for provider in providers.split(",")] return ["COGNITO"] def _get_dependencies_variables(self): variables = {"OAuthScopes": self._get_oauth_scopes()} if self.parameters.get("staticsite_auth_at_edge", False): self._ensure_auth_at_edge_requirements() variables.update( { "AuthAtEdge": self.parameters.get("staticsite_auth_at_edge", False), "SupportedIdentityProviders": self._get_supported_identity_providers(), "RedirectPathSignIn": ( "${default staticsite_redirect_path_sign_in::/parseauth}" ), "RedirectPathSignOut": ( "${default staticsite_redirect_path_sign_out::/}" ), } ) if self.parameters.get("staticsite_aliases"): variables.update( {"Aliases": self.parameters.get("staticsite_aliases").split(",")} ) if self.parameters.get("staticsite_additional_redirect_domains"): variables.update( { "AdditionalRedirectDomains": self.parameters.get( "staticsite_additional_redirect_domains" ).split(",") } ) if self.parameters.get("staticsite_create_user_pool", False): variables.update( { "CreateUserPool": self.parameters.get( "staticsite_create_user_pool", False ) } ) return variables def _get_user_pool_id_retriever_variables(self): args = { "user_pool_arn": self.parameters.get("staticsite_user_pool_arn", ""), } if self.parameters.get("staticsite_create_user_pool", False): args[ "created_user_pool_id" ] = "${rxref %s-dependencies::AuthAtEdgeUserPoolId}" % (self.name) return args def _get_domain_updater_variables(self): return { "client_id_output_lookup": "%s-dependencies::AuthAtEdgeClient" % self.name, "client_id": "${rxref %s-dependencies::AuthAtEdgeClient}" % self.name, } def _get_lambda_config_variables( self, site_stack_variables, nonce_secret_param, required_group=None ): return { "client_id": "${rxref %s-dependencies::AuthAtEdgeClient}" % self.name, "bucket": "${rxref %s-dependencies::ArtifactsBucketName}" % self.name, "cookie_settings": site_stack_variables["CookieSettings"], "http_headers": site_stack_variables["HttpHeaders"], "nonce_signing_secret_param_name": nonce_secret_param, "oauth_scopes": site_stack_variables["OAuthScopes"], "redirect_path_refresh": site_stack_variables["RedirectPathAuthRefresh"], "redirect_path_sign_in": site_stack_variables["RedirectPathSignIn"], "redirect_path_sign_out": site_stack_variables["RedirectPathSignOut"], "required_group": required_group, } def _get_client_updater_variables(self, name, site_stack_variables): aliases = [add_url_scheme(x) for x in site_stack_variables["Aliases"]] return { "alternate_domains": aliases, "client_id": "${rxref %s-dependencies::AuthAtEdgeClient}" % self.name, "distribution_domain": "${rxref %s::CFDistributionDomainName}" % name, "oauth_scopes": site_stack_variables["OAuthScopes"], "redirect_path_sign_in": site_stack_variables["RedirectPathSignIn"], "redirect_path_sign_out": site_stack_variables["RedirectPathSignOut"], "supported_identity_providers": site_stack_variables[ "SupportedIdentityProviders" ], } def _ensure_auth_at_edge_requirements(self): if not ( self.parameters.get("staticsite_user_pool_arn") or self.parameters.get("staticsite_create_user_pool") ): self.logger.error( "staticsite_user_pool_arn or staticsite_create_user_pool " "is required for Auth@Edge; " ) sys.exit(1) def _ensure_correct_region_with_auth_at_edge(self): """Exit if not in the us-east-1 region and deploying to Auth@Edge. Lambda@Edge is only available within the us-east-1 region. """ if ( self.parameters.get("staticsite_auth_at_edge", False) and self.region != "us-east-1" ): self.logger.error("Auth@Edge must be deployed in us-east-1.") sys.exit(1) def _ensure_cloudfront_with_auth_at_edge(self): """Exit if both the Auth@Edge and CloudFront disablement are true.""" if self.parameters.get("staticsite_cf_disable", False) and self.parameters.get( "staticsite_auth_at_edge", False ): self.logger.error( 'staticsite_cf_disable must be "false" if ' 'staticsite_auth_at_edge is "true"' ) sys.exit(1) def _ensure_valid_environment_config(self): """Exit if config is invalid.""" if not self.parameters.get("namespace"): self.logger.error("namespace parameter is required but not defined") sys.exit(1)