Source code for runway.cfngin.hooks.acm

"""CFNgin hooks for AWS Certificate Manager."""

from __future__ import annotations

import logging
import time
from typing import TYPE_CHECKING, Any, ClassVar, Dict, List, Optional, Type

from botocore.exceptions import ClientError
from troposphere import Ref
from troposphere.certificatemanager import Certificate as CertificateResource
from typing_extensions import Literal

from ...utils import MutableMap
from ..blueprints.variables.types import CFNString
from ..exceptions import StackDoesNotExist, StackFailed, StackUpdateBadStatus
from ..status import NO_CHANGE, SUBMITTED
from .base import Hook, HookArgsBaseModel
from .utils import BlankBlueprint

if TYPE_CHECKING:
    from mypy_boto3_acm.client import ACMClient
    from mypy_boto3_acm.type_defs import ResourceRecordTypeDef
    from mypy_boto3_route53.client import Route53Client
    from mypy_boto3_route53.type_defs import ChangeTypeDef

    from ...context import CfnginContext
    from ..blueprints.base import Blueprint
    from ..providers.aws.default import Provider
    from ..stack import Stack
    from ..status import Status

LOGGER = logging.getLogger(__name__)


[docs]class HookArgs(HookArgsBaseModel): """Hook arguments.""" alt_names: List[str] = [] domain: str hosted_zone_id: str stack_name: Optional[str] = None ttl: int = 300
[docs]class Certificate(Hook): r"""Hook for managing a **AWS::CertificateManager::Certificate**. Keyword Args: alt_names (Optional[List[str]]): Additional FQDNs to be included in the Subject Alternative Name extension of the ACM certificate. For example, you can add www.example.net to a certificate for which the domain field is www.example.com if users can reach your site by using either name. domain (str): The fully qualified domain name (FQDN), such as www.example.com, with which you want to secure an ACM certificate. Use an asterisk (``*``) to create a wildcard certificate that protects several sites in the same domain. For example, \*.example.com protects www.example.com, site.example.com, and images.example.com. hosted_zone_id (str): The ID of the Route 53 Hosted Zone that contains the resource record sets that you want to change. This must exist in the same account that the certificate will be created in. stack_name (Optional[str]): Provide a name for the stack used to create the certificate. If not provided, the domain is used (replacing ``.`` with ``-``). ttl (Optional[int]): The resource record cache time to live (TTL), in seconds. (*default:* ``300``) .. rubric:: Example .. code-block:: yaml pre_deploy: example-wildcard-cert: path: runway.cfngin.hooks.acm.Certificate required: true args: domain: '*.example.com' hosted_zone_id: ${xref example-com::HostedZoneId} """ ARGS_PARSER: ClassVar[Type[HookArgs]] = HookArgs acm_client: ACMClient args: HookArgs blueprint: Blueprint r53_client: Route53Client stack: Stack template_description: str
[docs] def __init__( self, context: CfnginContext, provider: Provider, **kwargs: Any ) -> None: """Instantiate class. Args: context: Context instance. (passed in by CFNgin) provider: Provider instance. (passed in by CFNgin) """ super().__init__(context, provider, **kwargs) self.template_description = self.get_template_description() self.stack_name = self.args.stack_name or self.args.domain.replace(".", "-") self.properties = MutableMap( **{ "DomainName": self.args.domain, "SubjectAlternativeNames": self.args.alt_names, "Tags": self.tags, "ValidationMethod": "DNS", } ) self.blueprint = self._create_blueprint() session = self.context.get_session() self.acm_client = session.client("acm") self.r53_client = session.client("route53") self.stack = self.generate_stack( variables={ "ValidateRecordTTL": self.args.ttl, "DomainName": self.args.domain, } )
def _create_blueprint(self) -> Blueprint: """Create CFNgin Blueprint.""" var_description = ( "NO NOT CHANGE MANUALLY! Used to track the " "state of a value set outside of CloudFormation" " using a Runway hook." ) class _BlankBlueprint(BlankBlueprint): VARIABLES = { "DomainName": {"type": CFNString, "description": var_description}, "ValidateRecordTTL": { "type": CFNString, "description": var_description, }, } blueprint = _BlankBlueprint(self.stack_name, self.context) blueprint.template.set_version("2010-09-09") blueprint.template.set_description(self.template_description) cert = blueprint.template.add_resource( CertificateResource("Certificate", **self.properties.data) ) blueprint.add_output(f"{cert.title}Arn", cert.ref()) blueprint.add_output("ValidateRecordTTL", Ref("ValidateRecordTTL")) blueprint.add_output("DomainName", Ref("DomainName")) return blueprint
[docs] def domain_changed(self) -> bool: """Check to ensure domain has not changed for existing stack.""" try: stack_info = self.provider.get_stack(self.stack.fqn) if self.provider.is_stack_recreatable(stack_info): LOGGER.debug( "stack is in a recreatable state; domain change does not matter" ) return False if self.provider.is_stack_in_progress( stack_info ) or self.provider.is_stack_rolling_back(stack_info): LOGGER.debug("stack is in progress; can't check for domain change") return False if ( self.args.domain != self.provider.get_outputs(self.stack.fqn)["DomainName"] ): LOGGER.error( '"domain" can\'t be changed for existing ' 'certificate in stack "%s"', self.stack.fqn, ) return True except StackDoesNotExist: pass except KeyError: LOGGER.warning( 'stack "%s" is missing output DomainName; update may fail', self.stack.fqn, ) return False
[docs] def get_certificate(self, interval: int = 5) -> str: """Get the certificate being created by a CloudFormation. Args: interval: Number of seconds to wait between attempts. Returns: Certificate ARN. """ response = self.provider.cloudformation.describe_stack_resources( StackName=self.stack.fqn, LogicalResourceId="Certificate" )["StackResources"] if response: # can be returned without having a PhysicalResourceId if "PhysicalResourceId" in response[0]: return response[0]["PhysicalResourceId"] LOGGER.debug("waiting for certificate to be created...") time.sleep(interval) return self.get_certificate(interval=interval)
[docs] def get_validation_record( self, cert_arn: Optional[str] = None, *, interval: int = 5, status: str = "PENDING_VALIDATION", ) -> ResourceRecordTypeDef: """Get validation record from the certificate being created. Args: cert_arn: ARN of the certificate to validate. interval: Number of seconds to wait between attempts. status: Validation status to look for when finding a validation record. Typically only "PENDING_VALIDATION" or "SUCCESS" will be used. Raises: ValueError: No pending or too many pending certificates. """ if not cert_arn: cert_arn = self.get_certificate() cert = self.acm_client.describe_certificate(CertificateArn=cert_arn).get( "Certificate", {} ) try: domain_validation = [ opt for opt in cert["DomainValidationOptions"] if opt["ValidationStatus"] == status ] except KeyError: LOGGER.debug( "waiting for DomainValidationOptions to become " "available for the certificate..." ) time.sleep(interval) return self.get_validation_record( cert_arn=cert_arn, interval=interval, status=status ) if not domain_validation: raise ValueError( f'No validations with status "{status}" found for "{self.args.domain}"' ) if len(domain_validation) > 1: raise ValueError( f"Found {len(domain_validation)} validation options of status " f'"{status}" for "{self.args.domain}"; only one option is supported' ) try: # the validation option can exists before the record set is ready return domain_validation[0]["ResourceRecord"] except KeyError: LOGGER.debug( "waiting for DomainValidationOptions.ResourceRecord " "to become available for the certificate..." ) time.sleep(interval) return self.get_validation_record( cert_arn=cert_arn, interval=interval, status=status )
[docs] def put_record_set(self, record_set: ResourceRecordTypeDef) -> None: """Create/update a record set on a Route 53 Hosted Zone. Args: record_set: Record set to be added to Route 53. """ LOGGER.info( "adding validation record to hosted zone: %s", self.args.hosted_zone_id ) self.__change_record_set("CREATE", [record_set])
[docs] def remove_validation_records( self, records: Optional[List[ResourceRecordTypeDef]] = None ) -> None: """Remove all record set entries used to validate an ACM Certificate. Args: records: List of validation records to remove from Route 53. This can be provided in cases were the certificate has been deleted during a rollback. """ if not records: cert_arn = self.get_certificate() cert = self.acm_client.describe_certificate(CertificateArn=cert_arn).get( "Certificate", {} ) records = [ opt["ResourceRecord"] for opt in cert.get("DomainValidationOptions", []) if opt.get("ValidationMethod") == "DNS" and "ResourceRecord" in opt ] LOGGER.info( "removing %i validation record(s) from hosted zone: %s", len(records), self.args.hosted_zone_id, ) self.__change_record_set("DELETE", records)
[docs] def update_record_set(self, record_set: ResourceRecordTypeDef) -> None: """Update a validation record set when the cert has not changed. Args: record_set: Record set to be updated in Route 53. """ LOGGER.info("updating record set...") self.__change_record_set("UPSERT", [record_set])
def __change_record_set( self, action: Literal["CREATE", "DELETE", "UPSERT"], record_sets: List[ResourceRecordTypeDef], ) -> None: """Wrap boto3.client('acm').change_resource_record_sets. Args: action: Change action. record_sets: Record sets to change. """ if not record_sets: raise ValueError("Must provide one of more record sets") changes: List[ChangeTypeDef] = [ { "Action": action, "ResourceRecordSet": { "Name": record["Name"], "Type": record["Type"], "ResourceRecords": [{"Value": record["Value"]}], "TTL": self.args.ttl, }, } for record in record_sets ] LOGGER.debug( 'making the following changes to hosted zone "%s":\n%s', self.args.hosted_zone_id, changes, ) self.r53_client.change_resource_record_sets( HostedZoneId=self.args.hosted_zone_id, ChangeBatch={"Comment": self.template_description, "Changes": changes}, )
[docs] def deploy(self, status: Optional[Status] = None) -> Dict[str, str]: """Deploy an ACM Certificate.""" record = None try: if self.domain_changed(): return {} if not status: status = self.deploy_stack() cert_arn = self.get_certificate() result = {"CertificateArn": cert_arn} if status == NO_CHANGE: LOGGER.debug("stack did not change; no action required") return result if status == SUBMITTED: if status.reason == "creating new stack": record = self.get_validation_record(cert_arn) self.put_record_set(record) LOGGER.info( "waiting for certificate to validate; " "this can take upwards of 30 minutes to " "complete..." ) elif status.reason == "updating existing stack": # get the cert ARN again in case it changed during update cert_arn = self.get_certificate() record = self.get_validation_record(cert_arn, status="SUCCESS") self.update_record_set(record) elif status.reason == "destroying stack for re-creation": # handle recreating a stack in a failed state return self.deploy( status=self._wait_for_stack( self._deploy_action, last_status=status, till_reason="creating new stack", ) ) self._wait_for_stack(self._deploy_action, last_status=status) return result except ( self.r53_client.exceptions.InvalidChangeBatch, self.r53_client.exceptions.NoSuchHostedZone, StackFailed, ) as err: LOGGER.error(err) self.destroy( records=[record] if record else None, skip_r53=isinstance( # type: ignore err, ( self.r53_client.exceptions.InvalidChangeBatch, self.r53_client.exceptions.NoSuchHostedZone, ), ), ) except StackUpdateBadStatus as err: # don't try to destroy the stack when it can be in progress LOGGER.error(err) return {}
[docs] def destroy( self, records: Optional[List[ResourceRecordTypeDef]] = None, skip_r53: bool = False, ) -> bool: """Destroy an ACM certificate. Args: records: List of validation records to remove from Route 53. This can be provided in cases were the certificate has been deleted during a rollback. skip_r53: Skip the removal of validation records. """ if not skip_r53: try: self.remove_validation_records(records) except ( self.r53_client.exceptions.InvalidChangeBatch, self.r53_client.exceptions.NoSuchHostedZone, self.acm_client.exceptions.ResourceNotFoundException, ) as err: # these error are fine if they happen during destruction but # could require manual steps to finish cleanup. LOGGER.warning( "deletion of the validation records failed with error:\n%s", err ) except ClientError as err: if err.response["Error"]["Message"] != ( f"Stack with id {self.stack.fqn} does not exist" ): raise LOGGER.warning( "deletion of the validation records failed with error:\n%s", err ) else: LOGGER.info("deletion of validation records was skipped") self.destroy_stack(wait=True) return True
[docs] def post_deploy(self) -> Dict[str, str]: """Run during the **post_deploy** stage.""" return self.deploy()
[docs] def post_destroy(self) -> bool: """Run during the **post_destroy** stage.""" return self.destroy()
[docs] def pre_deploy(self) -> Dict[str, str]: """Run during the **pre_deploy** stage.""" return self.deploy()
[docs] def pre_destroy(self) -> bool: """Run during the **pre_destroy** stage.""" return self.destroy()