"""File lookup."""
# pylint: disable=arguments-differ,no-self-argument
# pyright: reportIncompatibleMethodOverride=none
from __future__ import annotations
import base64
import collections.abc
import json
import re
from typing import Any, Callable, Dict, List, Mapping, Sequence, Tuple, Union, overload
import yaml
from pydantic import validator
from troposphere import Base64, GenericHelperFn
from typing_extensions import Final, Literal
from ....lookups.handlers.base import LookupHandler
from ....utils import BaseModel
from ...utils import read_value_from_path
_PARAMETER_PATTERN = re.compile(r"{{([::|\w]+)}}")
ParameterizedObjectTypeDef = Union[str, Mapping[str, Any], Sequence[Any], Any]
ParameterizedObjectReturnTypeDef = Union[
Dict[str, "ParameterizedObjectReturnTypeDef"],
GenericHelperFn,
List["ParameterizedObjectReturnTypeDef"],
]
[docs]class ArgsDataModel(BaseModel):
"""Arguments data model."""
codec: str
"""Codec that will be used to parse and/or manipulate the data."""
@validator("codec", allow_reuse=True)
def _validate_supported_codec(cls, v: str) -> str:
"""Validate that the selected codec is supported."""
if v in CODECS:
return v
raise ValueError(f"Codec '{v}' must be one of: {', '.join(CODECS)}")
[docs]class FileLookup(LookupHandler):
"""File lookup."""
TYPE_NAME: Final[Literal["file"]] = "file"
"""Name that the Lookup is registered as."""
[docs] @classmethod
def parse(cls, value: str) -> Tuple[str, Dict[str, str]]:
"""Parse the value passed to the lookup.
This overrides the default parsing to account for special requirements.
Args:
value: The raw value passed to a lookup.
Returns:
The lookup query and a dict of arguments
Raises:
ValueError: The value provided does not match the expected regex.
"""
args: Dict[str, str] = {}
try:
args["codec"], data_or_path = value.split(":", 1)
except ValueError:
raise ValueError(
f"Query '{value}' doesn't match regex: "
rf"^(?P<codec>[{'|'.join(CODECS)}]:.+$)"
) from None
return read_value_from_path(data_or_path), args
[docs] @classmethod
def handle(cls, value: str, **_: Any) -> Any:
"""Translate a filename into the file contents."""
data, raw_args = cls.parse(value)
args = ArgsDataModel.parse_obj(raw_args)
return CODECS[args.codec](data)
def _parameterize_string(raw: str) -> GenericHelperFn:
"""Substitute placeholders in a string using CloudFormation references.
Args:
raw: String to be processed. Byte strings are not supported; decode them
before passing them to this function.
Returns:
An expression with placeholders from the input replaced, suitable to be
passed to Troposphere to be included in CloudFormation template.
This will be the input string without modification if no substitutions
are found, and a composition of CloudFormation calls otherwise.
"""
parts: List[Any] = []
s_index = 0
for match in _PARAMETER_PATTERN.finditer(raw):
parts.append(raw[s_index : match.start()])
parts.append({"Ref": match.group(1)})
s_index = match.end()
if not parts:
return GenericHelperFn(raw)
parts.append(raw[s_index:])
return GenericHelperFn({"Fn::Join": ["", parts]})
@overload
def parameterized_codec(raw: str, b64: Literal[False] = ...) -> GenericHelperFn:
...
@overload
def parameterized_codec(raw: str, b64: Literal[True] = ...) -> Base64:
...
[docs]def parameterized_codec(raw: str, b64: bool = False) -> Any:
"""Parameterize a string, possibly encoding it as Base64 afterwards.
Args:
raw: String to be processed. Byte strings will be interpreted as UTF-8.
b64: Whether to wrap the output in a Base64 CloudFormation call.
Returns:
:class:`troposphere.AWSHelperFn`: Output to be included in a
CloudFormation template.
"""
result = _parameterize_string(raw)
# Note, since we want a raw JSON object (not a string) output in the
# template, we wrap the result in GenericHelperFn (not needed if we're
# using Base64)
return Base64(result.data) if b64 else result
@overload
def _parameterize_obj(obj: Union[bytes, str]) -> GenericHelperFn:
...
@overload
def _parameterize_obj(obj: Mapping[str, Any]) -> ParameterizedObjectReturnTypeDef:
...
@overload
def _parameterize_obj(obj: List[Any]) -> ParameterizedObjectReturnTypeDef:
...
def _parameterize_obj(
obj: ParameterizedObjectTypeDef,
) -> ParameterizedObjectReturnTypeDef:
"""Recursively parameterize all strings contained in an object.
Parametrizes all values of a Mapping, all items of a Sequence, an
unicode string, or pass other objects through unmodified.
Args:
obj: Data to parameterize.
Return:
A parameterized object to be included in a CloudFormation template.
Mappings are converted to `dict`, Sequences are converted to `list`,
and strings possibly replaced by compositions of function calls.
"""
if isinstance(obj, str):
return _parameterize_string(obj)
if isinstance(obj, collections.abc.Mapping):
return {key: _parameterize_obj(value) for key, value in obj.items()}
if isinstance(obj, collections.abc.Sequence):
return [_parameterize_obj(item) for item in obj]
return obj
[docs]def yaml_codec(raw: str, parameterized: bool = False) -> Any:
"""YAML codec."""
data = yaml.load(raw, Loader=yaml.SafeLoader)
return _parameterize_obj(data) if parameterized else data
[docs]def json_codec(raw: str, parameterized: bool = False) -> Any:
"""JSON codec."""
data = json.loads(raw)
return _parameterize_obj(data) if parameterized else data
CODECS: Dict[str, Callable[[str], Any]] = {
"base64": lambda x: base64.b64encode(x.encode("utf8")).decode("utf-8"),
"json": lambda x: json_codec(x, parameterized=False),
"json-parameterized": lambda x: json_codec(x, parameterized=True),
"parameterized": lambda x: parameterized_codec(x, False),
"parameterized-b64": lambda x: parameterized_codec(x, True),
"plain": lambda x: x,
"yaml": lambda x: yaml_codec(x, parameterized=False),
"yaml-parameterized": lambda x: yaml_codec(x, parameterized=True),
}