"""AWS S3 bucket."""
from __future__ import annotations
import json
import logging
from typing import TYPE_CHECKING, Any, List, Optional, Union
from botocore.exceptions import ClientError
from .....compat import cached_property
from .....mixins import DelCachedPropMixin
from .._response import BaseResponse
from ._sync_handler import S3SyncHandler
if TYPE_CHECKING:
import boto3
from mypy_boto3_s3.client import S3Client
from mypy_boto3_s3.type_defs import (
CreateBucketOutputTypeDef,
GetBucketVersioningOutputTypeDef,
)
from .....context import CfnginContext, RunwayContext
LOGGER = logging.getLogger(__name__.replace("._", "."))
[docs]class Bucket(DelCachedPropMixin):
"""AWS S3 bucket."""
[docs] def __init__(
self,
context: Union[CfnginContext, RunwayContext],
name: str,
region: Optional[str] = None,
) -> None:
"""Instantiate class.
Args:
context: Current context object.
name: The name of the bucket.
region: The bucket's region.
"""
self.__ctx = context
self._region = region
self.name = name
@cached_property
def client(self) -> S3Client:
"""Create or reuse a boto3 client."""
return self.session.client("s3")
@property
def exists(self) -> bool:
"""Check whether the bucket exists and is accessible."""
return not self.forbidden and not self.not_found
@property
def forbidden(self) -> bool:
"""Check whether access to the bucket is forbidden."""
return self.head.metadata.forbidden
@cached_property
def head(self):
"""Check if a bucket exists and you have permission to access it.
To use this operation, the user must have permissions to perform the
``s3:ListBucket`` action.
This is a low level action that returns the raw result of the request.
"""
try:
return BaseResponse(**self.client.head_bucket(Bucket=self.name) or {})
except ClientError as err:
LOGGER.debug(
'received an error from AWS S3 when trying to head bucket "%s"',
self.name,
exc_info=True,
)
return BaseResponse.parse_obj(err.response)
@property
def not_found(self) -> bool:
"""Check whether the bucket exists."""
return self.head.metadata.not_found
@cached_property
def session(self) -> boto3.Session:
"""Create cached boto3 session."""
return self.__ctx.get_session(region=self._region)
[docs] def create(self, **kwargs: Any) -> Optional[CreateBucketOutputTypeDef]:
"""Create an S3 Bucket if it does not already exist.
Bucket creation will be skipped if it already exists or access is forbidden.
Keyword arguments are passed directly to the boto3 method.
Returns:
boto3 response.
"""
if self.forbidden or self.exists:
LOGGER.debug(
'skipped creating bucket "%s": %s',
self.name,
"access denied" if self.forbidden else "bucket already exists",
)
return None
kwargs["Bucket"] = self.name
if self.client.meta.region_name != "us-east-1":
kwargs.setdefault("CreateBucketConfiguration", {})
kwargs["CreateBucketConfiguration"].update(
{"LocationConstraint": self.client.meta.region_name}
)
LOGGER.debug("creating bucket: %s", json.dumps(kwargs))
self._del_cached_property("head")
return self.client.create_bucket(**kwargs)
[docs] def enable_versioning(self) -> None:
"""Enable versioning on the bucket if not already enabled."""
config = self.get_versioning()
if config.get("Status") == "Enabled":
LOGGER.debug(
'did not modify versioning policy for bucket "%s"; already enabled',
self.name,
)
return
config["Status"] = "Enabled"
self.client.put_bucket_versioning(
Bucket=self.name,
VersioningConfiguration={
"Status": "Enabled",
"MFADelete": config.get("MFADelete", "Disabled"),
},
)
LOGGER.debug('enabled versioning for bucket "%s"', self.name)
[docs] def get_versioning(self) -> GetBucketVersioningOutputTypeDef:
"""Get the versioning state of a bucket.
To retrieve the versioning state of a bucket, you must be the bucket owner.
Returns:
The current versioning state of the bucket containing ``Status``
and ``MFADelete`` (only if this has ever been configured).
"""
return self.client.get_bucket_versioning(Bucket=self.name)
[docs] def sync_from_local(
self,
src_directory: str,
*,
delete: bool = False,
exclude: Optional[List[str]] = None,
follow_symlinks: bool = False,
include: Optional[List[str]] = None,
prefix: Optional[str] = None,
) -> None:
"""Sync local directory to the S3 Bucket.
Args:
src_directory: Local directory to sync to S3.
delete: If true, files that exist in the destination but not in the
source are deleted.
exclude: List of patterns for files/objects to exclude.
follow_symlinks: If symlinks should be followed.
include: List of patterns for files/objects to explicitly include.
prefix: Optional prefix to append to synced objects.
"""
S3SyncHandler(
context=self.__ctx,
delete=delete,
dest=self.format_bucket_path_uri(prefix=prefix),
exclude=exclude,
follow_symlinks=follow_symlinks,
include=include,
session=self.session,
src=src_directory,
).run()
[docs] def sync_to_local(
self,
dest_directory: str,
*,
delete: bool = False,
exclude: Optional[List[str]] = None,
follow_symlinks: bool = False,
include: Optional[List[str]] = None,
prefix: Optional[str] = None,
) -> None:
"""Sync S3 bucket to local directory.
Args:
dest_directory: Local directory to sync S3 objects to.
delete: If true, files that exist in the destination but not in the
source are deleted.
exclude: List of patterns for files/objects to exclude.
follow_symlinks: If symlinks should be followed.
include: List of patterns for files/objects to explicitly include.
prefix: Optional prefix to append to synced objects.
"""
S3SyncHandler(
context=self.__ctx,
delete=delete,
dest=dest_directory,
exclude=exclude,
follow_symlinks=follow_symlinks,
include=include,
session=self.session,
src=self.format_bucket_path_uri(prefix=prefix),
).run()
[docs] def __bool__(self) -> bool:
"""Implement evaluation of instances as a bool."""
return self.exists