"""DynamoDB lookup."""
# pyright: reportIncompatibleMethodOverride=none
from __future__ import annotations
import re
from typing import TYPE_CHECKING, Any, Dict, List, Optional, Tuple, cast
from botocore.exceptions import ClientError
from typing_extensions import Final, Literal, TypedDict
from ....lookups.handlers.base import LookupHandler
from ....utils import BaseModel
from ...utils import read_value_from_path
if TYPE_CHECKING:
from ....context import CfnginContext
_QUERY_PATTERN = r"""(?x) # <table_name>@<partition_key>:<partition_key_value>.<attribute>
^(?P<table_name>[a-zA-Z0-9\-_\.]{3,255}) # name of the DynamoDB Table
@ # delimiter
(?P<partition_key>\S*) # partition/primary key
: # delimiter
(?P<partition_key_value>[^\.]*) # value of partition/primary key
\. # delimiter
(?P<attribute>.*)$ # attribute to get
"""
"""Lookup query pattern minus region argument.
.. note::
This pattern and/or it's variable will likely change in a future release so it
should not be consumed directly by any external code.
"""
[docs]class ArgsDataModel(BaseModel):
"""Arguments data model."""
region: Optional[str] = None
"""AWS region."""
[docs]class QueryDataModel(BaseModel):
"""Arguments data model."""
attribute: str
"""The attribute to be returned by this lookup.
Supports additional syntax to retrieve a nested value.
"""
partition_key: str
"""The DynamoDB Table's partition key."""
partition_key_value: str
"""The value of the partition key to query the database."""
table_name: str
"""Name of the DynamoDB Table to query."""
@property
def item_key(self) -> Dict[str, Dict[Literal["B", "N", "S"], Any]]:
"""Value to pass to boto3 ``.get_item()`` call as the ``Key`` argument.
Raises:
ValueError: The value of ``partition_key_value`` doesn't match the
required regex and so it can't be parsed.
"""
pattern = re.compile(r"^(?P<value>[^\[]+)\[?(?P<data_type>[BNS]+)?]?$")
match = pattern.search(self.partition_key_value)
if not match:
raise ValueError(
f"Partition key value '{self.partition_key_value}' "
f"doesn't match regex: {pattern.pattern}"
)
return {
self.partition_key: {
cast(
Literal["B", "N", "S"], match.groupdict("S")["data_type"]
): match.group("value")
}
}
[docs]class DynamodbLookup(LookupHandler):
"""DynamoDB lookup."""
TYPE_NAME: Final[Literal["dynamodb"]] = "dynamodb"
"""Name that the Lookup is registered as."""
[docs] @classmethod
def parse(cls, value: str) -> Tuple[str, Dict[str, str]]:
"""Parse the value passed to the lookup.
This overrides the default parsing to account for special requirements.
Args:
value: The raw value passed to a lookup.
Returns:
The lookup query and a dict of arguments
Raises:
ValueError: The value provided does not appear to contain the name of
a DynamoDB Table. The name of a Table is required.
"""
raw_value = read_value_from_path(value)
args: Dict[str, str] = {}
if "@" not in raw_value:
raise ValueError(
f"'{raw_value}' missing delimiter for DynamoDB Table name:\n{_QUERY_PATTERN}"
)
table_info, table_keys = raw_value.split("@", 1)
if ":" in table_info:
args["region"], table_info = table_info.split(":", 1)
return "@".join([table_info, table_keys]), args
[docs] @classmethod
def parse_query(cls, value: str) -> QueryDataModel:
"""Parse query string to extract. Does not support arguments in ``value``.
Raises:
ValueError: The argument provided does not match the expected format defined
with a regex pattern.
"""
# https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/HowItWorks.NamingRulesDataTypes.html
pattern = re.compile(_QUERY_PATTERN)
match = pattern.search(value)
if not match:
raise ValueError(f"Query '{value}' doesn't match regex:\n{pattern.pattern}")
return QueryDataModel.parse_obj(match.groupdict())
[docs] @classmethod
def handle( # pylint: disable=arguments-differ
cls, value: str, context: CfnginContext, *__args: Any, **__kwargs: Any
) -> Any:
"""Get a value from a DynamoDB table.
Args:
value: Parameter(s) given to this lookup.
``[<region>:]<tablename>@<partitionkey>:<keyvalue>.<keyvalue>...``
context: Context instance.
Raises:
ValueError: The value provided to the lookup resulted in an error.
.. note:: The region is optional, and defaults to the environment's
``AWS_DEFAULT_REGION`` if not specified.
"""
raw_query, raw_args = cls.parse(value)
query = cls.parse_query(raw_query)
args = ArgsDataModel.parse_obj(raw_args)
table_keys = query.attribute.split(".")
key_dict = _lookup_key_parse(table_keys)
dynamodb = context.get_session(region=args.region).client("dynamodb")
try:
response = dynamodb.get_item(
TableName=query.table_name,
Key=query.item_key,
ProjectionExpression=",".join(
[query.partition_key, *key_dict["clean_table_keys"]]
),
)
except dynamodb.exceptions.ResourceNotFoundException as exc:
raise ValueError(
f"Can't find the DynamoDB table: {query.table_name}"
) from exc
except ClientError as exc:
if exc.response["Error"]["Code"] == "ValidationException":
raise ValueError(
f"No DynamoDB record matched the partition key: {query.partition_key}"
) from exc
raise ValueError(
f"The DynamoDB lookup '{value}' encountered an error: {exc}"
) from exc
# find and return the key from the dynamo data returned
if "Item" in response:
return _get_val_from_ddb_data(response["Item"], key_dict["new_keys"])
raise ValueError(
f"The DynamoDB record could not be found using the following: {query.item_key}"
)
[docs]class ParsedLookupKey(TypedDict):
"""Return value of _lookup_key_parse."""
clean_table_keys: List[str]
new_keys: List[Dict[Literal["L", "M", "N", "S"], str]]
def _lookup_key_parse(table_keys: List[str]) -> ParsedLookupKey:
"""Return the order in which the stacks should be executed.
Args:
table_keys: List of keys a table.
Returns:
Includes a dict of lookup types with data types ('new_keys')
and a list of the lookups without ('clean_table_keys')
Raises:
ValueError: DynamoDB data type is not supported.
"""
# we need to parse the key lookup passed in
regex_matcher = r"\[([^\]]+)]"
valid_dynamodb_datatypes = ["L", "M", "N", "S"]
clean_table_keys: List[str] = []
new_keys: List[Dict[Literal["L", "M", "N", "S"], str]] = []
for key in table_keys:
match = re.search(regex_matcher, key)
if match:
# the datatypes are pulled from the dynamodb docs
if match.group(1) not in valid_dynamodb_datatypes:
raise ValueError(
f"CFNgin does not support looking up the data type: {match.group(1)}"
)
match_val = cast(Literal["L", "M", "N", "S"], match.group(1))
key = key.replace(match.group(0), "")
new_keys.append({match_val: key})
else:
new_keys.append({"S": key})
clean_table_keys.append(key)
return {"new_keys": new_keys, "clean_table_keys": clean_table_keys}
def _get_val_from_ddb_data(data: Dict[str, Any], keylist: List[Dict[str, str]]) -> Any:
"""Return the value of the lookup.
Args:
data: The raw DynamoDB data.
keylist: A list of keys to lookup. This must include the datatype.
Returns:
The value from the DynamoDB record, and casts it to a matching python
datatype.
"""
next_type: Optional[str] = None
# iterate through the keylist to find the matching key/datatype
for key in keylist:
for k in key:
if next_type is None:
data = data[key[k]]
else:
temp_dict = data[next_type]
data = temp_dict[key[k]]
next_type = k
if next_type == "L":
# if type is list, convert it to a list and return
return _convert_ddb_list_to_list(data[cast(str, next_type)])
if next_type == "N":
# TODO: handle various types of 'number' datatypes, (e.g. int, double)
# if a number, convert to an int and return
return int(data[cast(str, next_type)])
# else, just assume its a string and return
return str(data[cast(str, next_type)])
def _convert_ddb_list_to_list(conversion_list: List[Dict[str, Any]]) -> List[Any]:
"""Return a python list without the DynamoDB datatypes.
Args:
conversion_list: A DynamoDB list which includes the datatypes.
Returns:
Returns A sanitized list without the datatypes.
"""
ret_list: List[Any] = []
for val in conversion_list:
for v in val:
ret_list.append(val[v])
return ret_list