Source code for runway.cfngin.plan

"""CFNgin plan, plan components, and functions for interacting with a plan."""

from __future__ import annotations

import json
import logging
import os
import threading
import time
import uuid
from typing import (
    TYPE_CHECKING,
    Any,
    Callable,
    Dict,
    List,
    NoReturn,
    Optional,
    OrderedDict,
    Set,
    TypeVar,
    Union,
    overload,
)

from .._logging import LogLevels, PrefixAdaptor
from ..utils import merge_dicts
from .dag import DAG, DAGValidationError, walk
from .exceptions import CancelExecution, GraphError, PersistentGraphLocked, PlanFailed
from .stack import Stack
from .status import (
    COMPLETE,
    FAILED,
    PENDING,
    SKIPPED,
    SUBMITTED,
    FailedStatus,
    SkippedStatus,
)
from .ui import ui
from .utils import stack_template_key_name

if TYPE_CHECKING:
    from ..context import CfnginContext
    from .providers.aws.default import Provider
    from .status import Status

LOGGER = logging.getLogger(__name__)

_T = TypeVar("_T")


@overload
def json_serial(obj: Set[_T]) -> List[_T]: ...


@overload
def json_serial(obj: Union[Dict[Any, Any], int, List[Any], str]) -> NoReturn: ...


[docs]def json_serial(obj: Union[Set[Any], Any]) -> Any: """Serialize json. Args: obj: A python object. Example: json.dumps(data, default=json_serial) """ if isinstance(obj, set): return list(obj) raise TypeError
[docs]def merge_graphs(graph1: Graph, graph2: Graph) -> Graph: """Combine two Graphs into one, retaining steps. Args: graph1: Graph that ``graph2`` will be merged into. graph2: Graph that will be merged into ``graph1``. """ merged_graph_dict = merge_dicts(graph1.to_dict().copy(), graph2.to_dict()) steps = [ graph1.steps.get(name, graph2.steps.get(name)) for name in merged_graph_dict.keys() ] return Graph.from_steps([step for step in steps if step])
[docs]class Step: """State machine for executing generic actions related to stacks. Attributes: fn: Function to run to execute the step. This function will be ran multiple times until the step is "done". last_updated: Time when the step was last updated. logger: Logger for logging messages about the step. stack: the stack associated with this step status: The status of step. watch_func: Function that will be called to "tail" the step action. """ fn: Optional[Callable[..., Any]] last_updated: float logger: PrefixAdaptor stack: Stack status: Status watch_func: Optional[Callable[..., Any]]
[docs] def __init__( self, stack: Stack, *, fn: Optional[Callable[..., Any]] = None, watch_func: Optional[Callable[..., Any]] = None, ) -> None: """Instantiate class. Args: stack: The stack associated with this step fn: Function to run to execute the step. This function will be ran multiple times until the step is "done". watch_func: Function that will be called to "tail" the step action. """ self.stack = stack self.status = PENDING self.last_updated = time.time() self.logger = PrefixAdaptor(self.stack.name, LOGGER) self.fn = fn self.watch_func = watch_func
[docs] def run(self) -> bool: """Run this step until it has completed or been skipped.""" stop_watcher = threading.Event() watcher = None if self.watch_func: watcher = threading.Thread( target=self.watch_func, args=(self.stack, stop_watcher) ) watcher.start() try: while not self.done: self._run_once() finally: if watcher: stop_watcher.set() watcher.join() return self.ok
def _run_once(self) -> Status: """Run a step exactly once.""" if not self.fn: raise TypeError("Step.fn must be type Callable[..., Status] not None") try: status = self.fn(self.stack, status=self.status) except CancelExecution: status = SkippedStatus("canceled execution") except Exception as err: # pylint: disable=broad-except LOGGER.exception(err) status = FailedStatus(reason=str(err)) self.set_status(status) return status @property def name(self) -> str: """Name of the step. This is equal to the name of the stack it operates on. """ return self.stack.name @property def requires(self) -> Set[str]: """Return a list of step names this step depends on.""" return self.stack.requires @property def required_by(self) -> Set[str]: """Return a list of step names that depend on this step.""" return self.stack.required_by @property def completed(self) -> bool: """Return True if the step is in a COMPLETE state.""" return self.status == COMPLETE @property def skipped(self) -> bool: """Return True if the step is in a SKIPPED state.""" return self.status == SKIPPED @property def failed(self) -> bool: """Return True if the step is in a FAILED state.""" return self.status == FAILED @property def done(self) -> bool: """Return True if the step is finished. To be ``True``, status must be either COMPLETE, SKIPPED or FAILED) """ return self.completed or self.skipped or self.failed @property def ok(self) -> bool: """Return True if the step is finished (either COMPLETE or SKIPPED).""" return self.completed or self.skipped @property def submitted(self) -> bool: """Return True if the step is SUBMITTED, COMPLETE, or SKIPPED.""" return self.status >= SUBMITTED
[docs] def set_status(self, status: Status) -> None: """Set the current step's status. Args: status: The status to set the step to. """ if status is not self.status: LOGGER.debug("setting %s state to %s...", self.stack.name, status.name) self.status = status self.last_updated = time.time() if self.stack.logging: self.log_step()
[docs] def complete(self) -> None: """Shortcut for ``set_status(COMPLETE)``.""" self.set_status(COMPLETE)
[docs] def log_step(self) -> None: """Construct a log message for a set and log it to the UI.""" msg = self.status.name if self.status.reason: msg += f" ({self.status.reason})" if self.status.code == SUBMITTED.code: ui.log(LogLevels.NOTICE, msg, logger=self.logger) elif self.status.code == COMPLETE.code: ui.log(LogLevels.SUCCESS, msg, logger=self.logger) elif self.status.code == FAILED.code: ui.log(LogLevels.ERROR, msg, logger=self.logger) else: ui.info(msg, logger=self.logger)
[docs] def skip(self) -> None: """Shortcut for ``set_status(SKIPPED)``.""" self.set_status(SKIPPED)
[docs] def submit(self) -> None: """Shortcut for ``set_status(SUBMITTED)``.""" self.set_status(SUBMITTED)
[docs] @classmethod def from_stack_name( cls, stack_name: str, context: CfnginContext, requires: Optional[Union[List[str], Set[str]]] = None, fn: Optional[Callable[..., Status]] = None, watch_func: Optional[Callable[..., Any]] = None, ) -> Step: """Create a step using only a stack name. Args: stack_name: Name of a CloudFormation stack. context: Context object. Required to initialize a "fake" :class:`runway.cfngin.stack.Stack`. requires: Stacks that this stack depends on. fn: The function to run to execute the step. This function will be ran multiple times until the step is "done". watch_func: an optional function that will be called to "tail" the step action. """ # pylint: disable=import-outside-toplevel from runway.config.models.cfngin import CfnginStackDefinitionModel stack_def = CfnginStackDefinitionModel.construct( name=stack_name, requires=requires or [] ) stack = Stack(stack_def, context) return cls(stack, fn=fn, watch_func=watch_func)
[docs] @classmethod def from_persistent_graph( cls, graph_dict: Union[ Dict[str, List[str]], Dict[str, Set[str]], OrderedDict[str, Set[str]] ], context: CfnginContext, fn: Optional[Callable[..., Status]] = None, watch_func: Optional[Callable[..., Any]] = None, ) -> List[Step]: """Create a steps for a persistent graph dict. Args: graph_dict: A graph dict. context: Context object. Required to initialize a "fake" :class:`runway.cfngin.stack.Stack`. requires: Stacks that this stack depends on. fn: The function to run to execute the step. This function will be ran multiple times until the step is "done". watch_func: an optional function that will be called to "tail" the step action. """ return [ cls.from_stack_name(name, context, requires, fn, watch_func) for name, requires in graph_dict.items() ]
[docs] def __repr__(self) -> str: """Object represented as a string.""" return f"<CFNgin.plan.Step:{self.stack.name}>"
[docs] def __str__(self) -> str: """Object displayed as a string.""" return self.stack.name
[docs]class Graph: """Graph represents a graph of steps. The :class:`Graph` helps organize the steps needed to execute a particular action for a set of :class:`runway.cfngin.stack.Stack` objects. When initialized with a set of steps, it will first build a Directed Acyclic Graph from the steps and their dependencies. Example: >>> dag = DAG() >>> a = Step("a", fn=deploy) >>> b = Step("b", fn=deploy) >>> dag.add_step(a) >>> dag.add_step(b) >>> dag.connect(a, b) """ dag: DAG steps: Dict[str, Step]
[docs] def __init__( self, steps: Optional[Dict[str, Step]] = None, dag: Optional[DAG] = None ) -> None: """Instantiate class. Args: steps: Dict with key of step name and value of :class:`Step` for steps to initialize the Graph with. Note that if this is provided, a pre-configured :class:`runway.cfngin.dag.DAG` that already includes these steps should also be provided.. dag: An optional :class:`runway.cfngin.dag.DAG` object. If one is not provided, a new one will be initialized. """ self.steps = steps or {} self.dag = dag or DAG()
[docs] def add_step( self, step: Step, add_dependencies: bool = False, add_dependents: bool = False ) -> None: """Add a step to the graph. Args: step: The step to be added. add_dependencies: Connect steps that need to be completed before this step. add_dependents: Connect steps that require this step. """ self.steps[step.name] = step self.dag.add_node(step.name) if add_dependencies: for dep in step.requires: self.connect(step.name, dep) if add_dependents: for parent in step.required_by: self.connect(parent, step.name)
[docs] def add_step_if_not_exists( self, step: Step, add_dependencies: bool = False, add_dependents: bool = False ) -> None: """Try to add a step to the graph. Can be used when failure to add is acceptable. Args: step: The step to be added. add_dependencies: Connect steps that need to be completed before this step. add_dependents: Connect steps that require this step. """ if self.steps.get(step.name): return self.steps[step.name] = step self.dag.add_node_if_not_exists(step.name) if add_dependencies: for dep in step.requires: try: self.connect(step.name, dep) except GraphError: continue if add_dependents: for parent in step.required_by: try: self.connect(parent, step.name) except GraphError: continue
[docs] def add_steps(self, steps: List[Step]) -> None: """Add a list of steps. Args: steps: The step to be added. """ for step in steps: self.add_step(step) for step in steps: for dep in step.requires: self.connect(step.name, dep) for parent in step.required_by: self.connect(parent, step.name)
[docs] def pop(self, step: Step, default: Any = None) -> Any: """Remove a step from the graph. Args: step: The step to remove from the graph. default: Returned if the step could not be popped """ self.dag.delete_node_if_exists(step.name) return self.steps.pop(step.name, default)
[docs] def connect(self, step: str, dep: str) -> None: """Connect a dependency to a step. Args: step: Step name to add a dependency to. dep: Name of dependent step. """ try: self.dag.add_edge(step, dep) except (DAGValidationError, KeyError) as exc: raise GraphError(exc, step, dep) from None
[docs] def transitive_reduction(self) -> None: """Perform a transitive reduction on the underlying DAG. The transitive reduction of a graph is a graph with as few edges as possible with the same reachability as the original graph. See https://en.wikipedia.org/wiki/Transitive_reduction """ self.dag.transitive_reduction()
[docs] def walk( self, walker: Callable[[DAG, Callable[[str], Any]], Any], walk_func: Callable[[Step], Any], ) -> Any: """Walk the steps of the graph. Args: walker: Function used to walk the steps. walk_func: Function called with a :class:`Step` as the only argument for each step of the plan. """ def fn(step_name: str) -> Any: """Get a step by step name and execute the ``walk_func`` on it. Args: step_name: Name of a step. """ step = self.steps[step_name] return walk_func(step) return walker(self.dag, fn)
[docs] def downstream(self, step_name: str) -> List[Step]: """Return the direct dependencies of the given step.""" return [self.steps[dep] for dep in self.dag.downstream(step_name)]
[docs] def transposed(self) -> Graph: """Return a "transposed" version of this graph. Useful for walking in reverse. """ return Graph(steps=self.steps, dag=self.dag.transpose())
[docs] def filtered(self, step_names: List[str]) -> Graph: """Return a "filtered" version of this graph. Args: step_names: Steps to filter. """ return Graph(steps=self.steps, dag=self.dag.filter(step_names))
[docs] def topological_sort(self) -> List[Step]: """Perform a topological sort of the underlying DAG.""" nodes = self.dag.topological_sort() return [self.steps[step_name] for step_name in nodes]
[docs] def to_dict(self) -> OrderedDict[str, Set[str]]: """Return the underlying DAG as a dictionary.""" return self.dag.graph
[docs] def dumps(self, indent: Optional[int] = None) -> str: """Output the graph as a json serialized string for storage. Args: indent: Number of spaces for each indentation. """ return json.dumps(self.to_dict(), default=json_serial, indent=indent)
[docs] @classmethod def from_dict( cls, graph_dict: Union[ Dict[str, List[str]], Dict[str, Set[str]], OrderedDict[str, Set[str]] ], context: CfnginContext, ) -> Graph: """Create a Graph from a graph dict. Args: graph_dict: The dictionary used to create the graph. context: Required to init stacks. """ return cls.from_steps(Step.from_persistent_graph(graph_dict, context))
[docs] @classmethod def from_steps(cls, steps: List[Step]) -> Graph: """Create a Graph from Steps. Args: steps: Steps used to create the graph. """ graph = cls() graph.add_steps(steps) return graph
[docs] def __str__(self) -> str: """Object displayed as a string.""" return self.dumps()
[docs]class Plan: """A convenience class for working on a Graph. Attributes: context: Context object. description: Plan description. graph: Graph of the plan. id: UUID for the plan. reverse: The graph has been transposed for walking in reverse. require_unlocked: Require the persistent graph to be unlocked before executing steps. """ context: Optional[CfnginContext] description: str graph: Graph id: uuid.UUID require_unlocked: bool reverse: bool
[docs] def __init__( self, description: str, graph: Graph, context: Optional[CfnginContext] = None, reverse: bool = False, require_unlocked: bool = True, ) -> None: """Initialize class. Args: description: Description of what the plan is going to do. graph: Local graph used for the plan. context: Context object. reverse: Transpose the graph for walking in reverse. require_unlocked: Require the persistent graph to be unlocked before executing steps. """ self.context = context self.description = description self.id = uuid.uuid4() self.reverse = reverse self.require_unlocked = require_unlocked if self.reverse: graph = graph.transposed() if self.context: self.locked = self.context.persistent_graph_locked if self.context.stack_names: nodes = [ target for target in self.context.stack_names if graph.steps.get(target) ] graph = graph.filtered(nodes) else: self.locked = False self.graph = graph
[docs] def outline(self, level: int = logging.INFO, message: str = ""): """Print an outline of the actions the plan is going to take. The outline will represent the rough ordering of the steps that will be taken. Args: level: a valid log level that should be used to log the outline message: a message that will be logged to the user after the outline has been logged. """ LOGGER.log(level, 'plan "%s":', self.description) for steps, step in enumerate(self.steps, start=1): LOGGER.log( level, ' - step: %s: target: "%s", action: "%s"', steps, step.name, step.fn.__name__ if callable(step.fn) else step.fn, ) if message: LOGGER.log(level, message)
[docs] def dump( self, *, directory: str, context: CfnginContext, provider: Optional[Provider] = None, ) -> Any: """Output the rendered blueprint for all stacks in the plan. Args: directory: Directory where files will be created. context: Current CFNgin context. provider: Provider to use when resolving the blueprints. """ LOGGER.info('dumping "%s"...', self.description) directory = os.path.expanduser(directory) if not os.path.exists(directory): os.makedirs(directory) def walk_func(step: Step) -> bool: """Walk function.""" step.stack.resolve(context=context, provider=provider) blueprint = step.stack.blueprint filename = stack_template_key_name(blueprint) path = os.path.join(directory, filename) blueprint_dir = os.path.dirname(path) if not os.path.exists(blueprint_dir): os.makedirs(blueprint_dir) LOGGER.info('writing stack "%s" -> %s', step.name, path) with open(path, "w", encoding="utf-8") as _file: _file.write(blueprint.rendered) return True return self.graph.walk(walk, walk_func)
[docs] def execute(self, *args: Any, **kwargs: Any): """Walk each step in the underlying graph. Raises: PersistentGraphLocked: Raised if the persistent graph is locked prior to execution and this session did not lock it. PlanFailed: Raised if any of the steps fail. """ if self.locked and self.require_unlocked: raise PersistentGraphLocked self.walk(*args, **kwargs) failed_steps = [step for step in self.steps if step.status == FAILED] if failed_steps: raise PlanFailed(failed_steps)
[docs] def walk(self, walker: Callable[..., Any]) -> Any: """Walk each step in the underlying graph, in topological order. Args: walker: a walker function to be passed to :class:`runway.cfngin.dag.DAG` to walk the graph. """ def walk_func(step: Step) -> bool: """Execute a :class:`Step` wile walking the graph. Handles updating the persistent graph if one is being used. Args: step: :class:`Step` to execute. """ # Before we execute the step, we need to ensure that it's # transitive dependencies are all in an "ok" state. If not, we # won't execute this step. for dep in self.graph.downstream(step.name): if not dep.ok: step.set_status(FailedStatus("dependency has failed")) return step.ok result = step.run() if not self.context or not self.context.persistent_graph: return result if step.completed or ( step.skipped and step.status.reason == ("does not exist in cloudformation") ): fn_name = step.fn.__name__ if callable(step.fn) else step.fn if fn_name == "_destroy_stack": self.context.persistent_graph.pop(step) LOGGER.debug( "removed step '%s' from the persistent graph", step.name ) elif fn_name == "_launch_stack": self.context.persistent_graph.add_step_if_not_exists( step, add_dependencies=True, add_dependents=True ) LOGGER.debug("added step '%s' to the persistent graph", step.name) else: return result self.context.put_persistent_graph(self.lock_code) return result return self.graph.walk(walker, walk_func)
@property def lock_code(self) -> str: """Code to lock/unlock the persistent graph.""" return str(self.id) @property def steps(self) -> List[Step]: """Return a list of all steps in the plan.""" steps = self.graph.topological_sort() steps.reverse() return steps @property def step_names(self) -> List[str]: """Return a list of all step names.""" return [step.name for step in self.steps]
[docs] def keys(self) -> List[str]: """Return a list of all step names.""" return self.step_names