Source code for runway.cfngin.hooks.cleanup_s3

"""CFNgin hook for cleaning up resources prior to CFN stack deletion."""
from __future__ import annotations

import logging
from typing import TYPE_CHECKING, Any

from botocore.exceptions import ClientError

from ...utils import BaseModel

if TYPE_CHECKING:
    from ...context import CfnginContext

LOGGER = logging.getLogger(__name__)


[docs]class PurgeBucketHookArgs(BaseModel): """Hook arguments for ``purge_bucket``.""" bucket_name: str """Name of the bucket to purge."""
[docs]def purge_bucket(context: CfnginContext, *__args: Any, **kwargs: Any) -> bool: """Delete objects in bucket.""" args = PurgeBucketHookArgs.parse_obj(kwargs) session = context.get_session() s3_resource = session.resource("s3") try: s3_resource.meta.client.head_bucket(Bucket=args.bucket_name) except ClientError as exc: if exc.response["Error"]["Code"] == "404": LOGGER.info( 'bucket "%s" does not exist; unable to complete purge', args.bucket_name ) return True raise bucket = s3_resource.Bucket(args.bucket_name) bucket.object_versions.delete() return True