"""CFNgin hook for syncing static website to S3 bucket."""
# TODO move to runway.cfngin.hooks on next major release
import hashlib
import json
import logging
import os
import time
from operator import itemgetter
import yaml
from ...cfngin.lookups.handlers.output import OutputLookup
from ...core.providers import aws
LOGGER = logging.getLogger(__name__)
[docs]def get_archives_to_prune(archives, hook_data):
"""Return list of keys to delete.
Args:
archives (Dict): The full list of file archives
hook_data (Dict): CFNgin hook data
"""
files_to_skip = []
for i in ["current_archive_filename", "old_archive_filename"]:
if hook_data.get(i):
files_to_skip.append(hook_data[i])
archives.sort( # sort from oldest to newest
key=itemgetter("LastModified"), reverse=False
)
# Drop all but last 15 files
return [i["Key"] for i in archives[:-15] if i["Key"] not in files_to_skip]
[docs]def sync(context, provider, **kwargs):
"""Sync static website to S3 bucket.
Args:
context (:class:`runway.cfngin.context.Context`): The context
instance.
provider (:class:`runway.cfngin.providers.base.BaseProvider`):
The provider instance.
"""
session = context.get_session()
bucket_name = OutputLookup.handle(
kwargs.get("bucket_output_lookup"), provider=provider, context=context
)
build_context = context.hook_data["staticsite"]
invalidate_cache = False
extra_files = sync_extra_files(
context,
bucket_name,
kwargs.get("extra_files", []),
hash_tracking_parameter=build_context.get("hash_tracking_parameter"),
)
if extra_files:
invalidate_cache = True
if build_context["deploy_is_current"]:
LOGGER.info("skipped upload; latest version already deployed")
else:
# Using the awscli for s3 syncing is incredibly suboptimal, but on
# balance it's probably the most stable/efficient option for syncing
# the files until https://github.com/boto/boto3/issues/358 is resolved
sync_args = [
"s3",
"sync",
build_context["app_directory"],
"s3://%s/" % bucket_name,
"--delete",
]
for extra_file in [f["name"] for f in kwargs.get("extra_files", [])]:
sync_args.extend(["--exclude", extra_file])
aws.cli(sync_args)
invalidate_cache = True
if kwargs.get("cf_disabled", False):
display_static_website_url(kwargs.get("website_url"), provider, context)
elif invalidate_cache:
distribution = get_distribution_data(context, provider, **kwargs)
invalidate_distribution(session, **distribution)
LOGGER.info("sync complete")
if not build_context["deploy_is_current"]:
update_ssm_hash(context, session)
prune_archives(context, session)
return True
[docs]def display_static_website_url(website_url_handle, provider, context):
"""Based on the url handle display the static website url.
Args:
website_url_handle (str): the Output handle for the website url
provider (:class:`runway.cfngin.providers.base.BaseProvider`):
The provider instance.
context (:class:`runway.cfngin.context.Context`): context instance
"""
bucket_url = OutputLookup.handle(
website_url_handle, provider=provider, context=context
)
LOGGER.info("STATIC WEBSITE URL: %s", bucket_url)
[docs]def update_ssm_hash(context, session):
"""Update the SSM hash with the new tracking data.
Args:
context (:class:`runway.cfngin.context.Context`): context instance
session (:class:`runway.cfngin.session.Session`): CFNgin session
"""
build_context = context.hook_data["staticsite"]
if not build_context.get("hash_tracking_disabled"):
hash_param = build_context["hash_tracking_parameter"]
hash_value = build_context["hash"]
LOGGER.info("updating SSM parameter %s with hash %s", hash_param, hash_value)
set_ssm_value(
session,
hash_param,
hash_value,
"Hash of currently deployed static website source",
)
return True
[docs]def get_distribution_data(context, provider, **kwargs):
"""Retrieve information about the distribution.
Args:
context (:class:`runway.cfngin.context.Context`): The context
instance.
provider (:class:`runway.cfngin.providers.base.BaseProvider`):
The provider instance
"""
LOGGER.verbose("retrieved distribution data")
return {
"identifier": OutputLookup.handle(
kwargs.get("distributionid_output_lookup"),
provider=provider,
context=context,
),
"domain": OutputLookup.handle(
kwargs.get("distributiondomain_output_lookup"),
provider=provider,
context=context,
),
"path": kwargs.get("distribution_path", "/*"),
}
[docs]def invalidate_distribution(session, identifier="", path="", domain="", **_):
"""Invalidate the current distribution.
Args:
session (Session): The current CFNgin session.
identifier (string): The distribution id.
path (string): The distribution path.
domain (string): The distribution domain.
"""
LOGGER.info("invalidating CloudFront distribution: %s (%s)", identifier, domain)
cf_client = session.client("cloudfront")
cf_client.create_invalidation(
DistributionId=identifier,
InvalidationBatch={
"Paths": {"Quantity": 1, "Items": [path]},
"CallerReference": str(time.time()),
},
)
LOGGER.info("CloudFront invalidation complete")
return True
[docs]def prune_archives(context, session):
"""Prune the archives from the bucket.
Args:
context (:class:`runway.cfngin.context.Context`): The context
instance.
session (:class:`runway.cfngin.session.Session`): The CFNgin
session.
"""
LOGGER.info("cleaning up old site archives...")
archives = []
s3_client = session.client("s3")
list_objects_v2_paginator = s3_client.get_paginator("list_objects_v2")
response_iterator = list_objects_v2_paginator.paginate(
Bucket=context.hook_data["staticsite"]["artifact_bucket_name"],
Prefix=context.hook_data["staticsite"]["artifact_key_prefix"],
)
for page in response_iterator:
archives.extend(page.get("Contents", []))
archives_to_prune = get_archives_to_prune(archives, context.hook_data["staticsite"])
# Iterate in chunks of 1000 to match delete_objects limit
for objects in [
archives_to_prune[i : i + 1000] for i in range(0, len(archives_to_prune), 1000)
]:
s3_client.delete_objects(
Bucket=context.hook_data["staticsite"]["artifact_bucket_name"],
Delete={"Objects": [{"Key": i} for i in objects]},
)
return True
[docs]def auto_detect_content_type(filename):
"""Auto detects the content type based on the filename.
Args:
filename (str): A filename to use to auto detect the content type.
Returns:
str: The content type of the file. None if the content type
could not be detected.
"""
_, ext = os.path.splitext(filename)
if ext == ".json":
return "application/json"
if ext in [".yml", ".yaml"]:
return "text/yaml"
return None
[docs]def get_content_type(extra_file):
"""Return the content type of the file.
Args:
extra_file (Dict[str, Union[str, Dict[Any]]]): The extra file
configuration.
Returns:
str: The content type of the extra file. If 'content_type' is
provided then that is returned, otherways it is auto detected
based on the name.
"""
return extra_file.get(
"content_type", auto_detect_content_type(extra_file.get("name"))
)
[docs]def get_content(extra_file):
"""Get serialized content based on content_type.
Args:
extra_file (Dict[str, Union[str, Dict[Any]]]): The extra file
configuration.
Returns:
str: Serialized content based on the content_type.
"""
content_type = extra_file.get("content_type")
content = extra_file.get("content")
if content:
if isinstance(content, (dict, list)):
if content_type == "application/json":
return json.dumps(content)
if content_type == "text/yaml":
return yaml.safe_dump(content)
raise ValueError(
'"content_type" must be json or yaml if "content" is not a string'
)
if not isinstance(content, str):
raise TypeError("unsupported content: %s" % type(content))
return content
[docs]def get_ssm_value(session, name):
"""Get the ssm parameter value.
Args:
session (:class:`runway.cfngin.session.Session`): The CFNgin session.
name (str): The parameter name.
Returns:
str: The parameter value.
"""
ssm_client = session.client("ssm")
try:
return ssm_client.get_parameter(Name=name)["Parameter"]["Value"]
except ssm_client.exceptions.ParameterNotFound:
return None
[docs]def set_ssm_value(session, name, value, description=""):
"""Set the ssm parameter.
Args:
session (:class:`runway.cfngin.session.Session`): The CFNgin session.
name (str): The name of the parameter.
value (str): The value of the parameter.
description (str): A description of the parameter.
"""
ssm_client = session.client("ssm")
ssm_client.put_parameter(
Name=name, Description=description, Value=value, Type="String", Overwrite=True
)