Source code for runway.cfngin.hooks.utils

"""Hook utils."""
from __future__ import annotations

import collections.abc
import logging
import os
import sys
from typing import TYPE_CHECKING, Any, Dict, List, cast

import pydantic

from ...exceptions import FailedVariableLookup
from ...utils import BaseModel, load_object_from_string
from ...variables import Variable, resolve_variables
from ..blueprints.base import Blueprint

if TYPE_CHECKING:
    from ...config.models.cfngin import CfnginHookDefinitionModel
    from ...context import CfnginContext
    from ..providers.aws.default import Provider

LOGGER = logging.getLogger(__name__)


[docs]class BlankBlueprint(Blueprint): """Blueprint that can be built programmatically."""
[docs] def create_template(self) -> None: """Create template without raising NotImplementedError."""
# TODO BREAKING find a better place for this
[docs]class TagDataModel(BaseModel): """AWS Resource Tag data model.""" key: str value: str
[docs] class Config: """Model configuration.""" allow_population_by_field_name = True extra = pydantic.Extra.forbid fields = { "key": {"alias": "Key"}, "value": {"alias": "Value"}, }
[docs]def full_path(path: str) -> str: """Return full path.""" return os.path.abspath(os.path.expanduser(path))
# TODO split up to reduce number of statements
[docs]def handle_hooks( # pylint: disable=too-many-statements stage: str, hooks: List[CfnginHookDefinitionModel], provider: Provider, context: CfnginContext, ): """Handle pre/post_deploy hooks. These are pieces of code that we want to run before/after deploying stacks. Args: stage: The current stage (pre_run, post_run, etc). hooks: Hooks to execute. provider: Provider instance. context: Context instance. """ if not hooks: LOGGER.debug("no %s hooks defined", stage) return hook_paths: List[str] = [] for i, hook in enumerate(hooks): try: hook_paths.append(hook.path) except KeyError as exc: raise ValueError(f"{stage} hook #{i} missing path.") from exc LOGGER.info("executing %s hooks: %s", stage, ", ".join(hook_paths)) for hook in hooks: if not hook.enabled: LOGGER.debug("hook with method %s is disabled; skipping", hook.path) continue try: method = load_object_from_string(hook.path, try_reload=True) except (AttributeError, ImportError): LOGGER.exception("unable to load method at %s", hook.path) if hook.required: raise continue if hook.args: args = [Variable(k, v) for k, v in hook.args.items()] try: # handling for output or similar being used in pre_deploy resolve_variables(args, context, provider) except FailedVariableLookup: if "pre" in stage: LOGGER.error( "lookups that change the order of execution, like " '"output", can only be used in "post_*" hooks; ' "please ensure that the hook being used does " "not rely on a stack, hook_data, or context that " "does not exist yet" ) raise kwargs: Dict[str, Any] = {v.name: v.value for v in args} else: kwargs = {} try: if isinstance(method, type): result: Any = getattr( method(context=context, provider=provider, **kwargs), stage )() else: result = cast(Any, method(context=context, provider=provider, **kwargs)) except Exception: # pylint: disable=broad-except LOGGER.exception("hook %s threw an exception", hook.path) if hook.required: raise continue if not result: if hook.required: LOGGER.error( "required hook %s failed; return value: %s", hook.path, result ) sys.exit(1) LOGGER.warning( "non-required hook %s failed; return value: %s", hook.path, result ) else: if isinstance(result, (collections.abc.Mapping, pydantic.BaseModel)): if hook.data_key: LOGGER.debug( "adding result for hook %s to context in data_key %s", hook.path, hook.data_key, ) context.set_hook_data(hook.data_key, result) else: LOGGER.debug( "hook %s returned result data but no data key set; ignoring", hook.path, )