Source code for runway.cfngin.hooks.ecr._purge_repository

"""Purge all images from an ECR repository."""
from __future__ import annotations

import logging
from typing import TYPE_CHECKING, Any, Dict, List

from ....utils import BaseModel

    from mypy_boto3_ecr.client import ECRClient
    from mypy_boto3_ecr.type_defs import ImageIdentifierTypeDef

    from ....context import CfnginContext

LOGGER = logging.getLogger(__name__.replace("._", "."))

class HookArgs(BaseModel):
    """Hook arguments for ``purge_repository``."""

    repository_name: str
    """Name of the repository to purge."""

def delete_ecr_images(
    client: ECRClient,
    image_ids: List[ImageIdentifierTypeDef],
    repository_name: str,
) -> None:
    """Delete images from an ECR repository."""
    response = client.batch_delete_image(
        repositoryName=repository_name, imageIds=image_ids
    if "failures" in response and response["failures"]:
        for msg in response["failures"]:
                "failed to delete image %s: (%s) %s",
                msg.get("imageId", {}).get("imageDigest")
                or msg.get("imageId", {}).get("imageTag"),
        raise ValueError("failures present in response")

def list_ecr_images(
    client: ECRClient, repository_name: str
) -> List[ImageIdentifierTypeDef]:
    """List all images in an ECR repository."""
    image_ids: List[ImageIdentifierTypeDef] = []
        response = client.list_images(
            repositoryName=repository_name, filter={"tagStatus": "ANY"}
        while response.get("nextToken"):
            response = client.list_images(
                filter={"tagStatus": "ANY"},
        return [
            {"imageDigest": digest}
            for digest in {
                image["imageDigest"] for image in image_ids if image.get("imageDigest")
    except client.exceptions.RepositoryNotFoundException:"repository %s does not exist", repository_name)
        return []

[docs]def purge_repository( context: CfnginContext, *__args: Any, **kwargs: Any ) -> Dict[str, str]: """Purge all images from an ECR repository. Args: context: CFNgin context object. """ args = HookArgs.parse_obj(kwargs) client = context.get_session().client("ecr") image_ids = list_ecr_images(client, repository_name=args.repository_name) if not image_ids:"no images found in repository %s", args.repository_name) return {"status": "skipped"} delete_ecr_images(client, image_ids=image_ids, repository_name=args.repository_name)"purged all images from repository") return {"status": "success"}