"""CFNgin plan, plan componenets, and functions for interacting with a plan."""
import json
import logging
import os
import threading
import time
import uuid
from runway._logging import LogLevels, PrefixAdaptor
from .dag import DAG, DAGValidationError, walk
from .exceptions import CancelExecution, GraphError, PersistentGraphLocked, PlanFailed
from .status import (
COMPLETE,
FAILED,
PENDING,
SKIPPED,
SUBMITTED,
FailedStatus,
SkippedStatus,
)
from .ui import ui
from .util import merge_map, stack_template_key_name
LOGGER = logging.getLogger(__name__)
[docs]def json_serial(obj):
"""Serialize json.
Args:
obj (Any): A python object.
Example:
json.dumps(data, default=json_serial)
"""
if isinstance(obj, set):
return list(obj)
raise TypeError
[docs]def merge_graphs(graph1, graph2):
"""Combine two Graphs into one, retaining steps.
Args:
graph1 (:class:`Graph`): Graph that ``graph2`` will
be merged into.
graph2 (:class:`Graph`): Graph that will be merged
into ``graph1``.
Returns:
:class:`Graph`: A combined graph.
"""
merged_graph_dict = merge_map(graph1.to_dict().copy(), graph2.to_dict())
steps = [
graph1.steps.get(name, graph2.steps.get(name))
for name in merged_graph_dict.keys()
]
return Graph.from_steps(steps)
[docs]class Step(object):
"""State machine for executing generic actions related to stacks.
Attributes:
fn (Optional[Callable]): Function to run to execute the step.
This function will be ran multiple times until the step is "done".
last_updated (float): Time when the step was last updated.
logger (logging.LoggerAdaptor): Logger for logging messages about
the step.
stack (:class:`runway.cfngin.stack.Stack`): the stack associated with
this step
status (:class:`runway.cfngin.status.Status`): The status of step.
watch_func (Optional[Callable]): Function that will be called to
"tail" the step action.
"""
def __init__(self, stack, fn=None, watch_func=None):
"""Instantiate class.
Args:
stack (:class:`runway.cfngin.stack.Stack`): The stack associated
with this step
fn (Optional[Callable]): Function to run to execute the step.
This function will be ran multiple times until the step is
"done".
watch_func (Optional[Callable]): Function that will be called to
"tail" the step action.
"""
self.stack = stack
self.status = PENDING
self.last_updated = time.time()
self.logger = PrefixAdaptor(self.stack.name, LOGGER)
self.fn = fn
self.watch_func = watch_func
[docs] def run(self):
"""Run this step until it has completed or been skipped.
Returns:
bool
"""
stop_watcher = threading.Event()
watcher = None
if self.watch_func:
watcher = threading.Thread(
target=self.watch_func, args=(self.stack, stop_watcher)
)
watcher.start()
try:
while not self.done:
self._run_once()
finally:
if watcher:
stop_watcher.set()
watcher.join()
return self.ok
def _run_once(self):
"""Run a step exactly once.
Returns:
str
"""
try:
status = self.fn(self.stack, status=self.status)
except CancelExecution:
status = SkippedStatus("canceled execution")
except Exception as err: # pylint: disable=broad-except
LOGGER.exception(err)
status = FailedStatus(reason=str(err))
self.set_status(status)
return status
@property
def name(self):
"""Name of the step.
This is equal to the name of the stack it operates on.
Returns:
str
"""
return self.stack.name
@property
def requires(self):
"""Return a list of step names this step depends on.
Returns:
List[str]
"""
return self.stack.requires
@property
def required_by(self):
"""Return a list of step names that depend on this step.
Returns:
List[str]
"""
return self.stack.required_by
@property
def completed(self):
"""Return True if the step is in a COMPLETE state.
Returns:
bool
"""
return self.status == COMPLETE
@property
def skipped(self):
"""Return True if the step is in a SKIPPED state.
Returns:
bool
"""
return self.status == SKIPPED
@property
def failed(self):
"""Return True if the step is in a FAILED state.
Returns:
bool
"""
return self.status == FAILED
@property
def done(self):
"""Return True if the step is finished.
To be ``True``, status must be either COMPLETE, SKIPPED or FAILED)
Returns:
bool
"""
return self.completed or self.skipped or self.failed
@property
def ok(self):
"""Return True if the step is finished (either COMPLETE or SKIPPED).
Returns:
bool
"""
return self.completed or self.skipped
@property
def submitted(self):
"""Return True if the step is SUBMITTED, COMPLETE, or SKIPPED.
Returns:
bool
"""
return self.status >= SUBMITTED
[docs] def set_status(self, status):
"""Set the current step's status.
Args:
status (:class:`runway.cfngin.status.Status`): The status to set the
step to.
"""
if status is not self.status:
LOGGER.debug("setting %s state to %s...", self.stack.name, status.name)
self.status = status
self.last_updated = time.time()
if self.stack.logging:
self.log_step()
[docs] def complete(self):
"""Shortcut for ``set_status(COMPLETE)``."""
self.set_status(COMPLETE)
[docs] def log_step(self):
"""Construct a log message for a set and log it to the UI."""
msg = self.status.name
if self.status.reason:
msg += " (%s)" % self.status.reason
if self.status.code == SUBMITTED.code:
ui.log(LogLevels.NOTICE, msg, logger=self.logger)
elif self.status.code == COMPLETE.code:
ui.log(LogLevels.SUCCESS, msg, logger=self.logger)
elif self.status.code == FAILED.code:
ui.log(LogLevels.ERROR, msg, logger=self.logger)
else:
ui.info(msg, logger=self.logger)
[docs] def skip(self):
"""Shortcut for ``set_status(SKIPPED)``."""
self.set_status(SKIPPED)
[docs] def submit(self):
"""Shortcut for ``set_status(SUBMITTED)``."""
self.set_status(SUBMITTED)
[docs] @classmethod
def from_stack_name(
cls, stack_name, context, requires=None, fn=None, watch_func=None
):
"""Create a step using only a stack name.
Args:
stack_name (str): Name of a CloudFormation stack.
context (:class:`runway.cfngin.context.Context`): Context object.
Required to initialize a "fake" :class:`runway.cfngin.stack.Stack`.
requires (List[str]): Stacks that this stack depends on.
fn (Callable): The function to run to execute the step.
This function will be ran multiple times until the step
is "done".
watch_func (Callable): an optional function that will be
called to "tail" the step action.
Returns:
:class:`Step`
"""
# pylint: disable=import-outside-toplevel
from runway.cfngin.config import Stack as StackConfig
from runway.cfngin.stack import Stack
stack_def = StackConfig({"name": stack_name, "requires": requires or []})
stack = Stack(stack_def, context)
return cls(stack, fn=fn, watch_func=watch_func)
[docs] @classmethod
def from_persistent_graph(cls, graph_dict, context, fn=None, watch_func=None):
"""Create a steps for a persistent graph dict.
Args:
graph_dict (Dict[str, List[str]]): A graph dict.
context (:class:`runway.cfngin.context.Context`): Context object.
Required to initialize a "fake" :class:`runway.cfngin.stack.Stack`.
requires (List[str]): Stacks that this stack depends on.
fn (Callable): The function to run to execute the step.
This function will be ran multiple times until the step
is "done".
watch_func (Callable): an optional function that will be
called to "tail" the step action.
Returns:
List[:class:`Step`]
"""
steps = []
for name, requires in graph_dict.items():
steps.append(cls.from_stack_name(name, context, requires, fn, watch_func))
return steps
def __repr__(self):
"""Object represented as a string."""
return "<CFNgin.plan.Step:%s>" % (self.stack.name,)
def __str__(self):
"""Object displayed as a string."""
return self.stack.name
[docs]class Graph(object):
"""Graph represents a graph of steps.
The :class:`Graph` helps organize the steps needed to execute a particular
action for a set of :class:`runway.cfngin.stack.Stack` objects. When
initialized with a set of steps, it will first build a Directed Acyclic
Graph from the steps and their dependencies.
Attributes:
dag (:class:`runway.cfngin.dag.DAG`): an optional
:class:`runway.cfngin.dag.DAG` object. If one is not provided, a
new one will be initialized.
steps (Dict[str, :class:`Step`]): Dict with key of step name and
value of :class:`Step`.
Example:
>>> dag = DAG()
>>> a = Step("a", fn=build)
>>> b = Step("b", fn=build)
>>> dag.add_step(a)
>>> dag.add_step(b)
>>> dag.connect(a, b)
"""
def __init__(self, steps=None, dag=None):
"""Instantiate class.
Args:
steps (Optional[Dict[str, :class:`Step`]]): Dict with key of step
name and value of :class:`Step` for steps to initialize the
Graph with. Note that if this is provided, a pre-configured
:class:`runway.cfngin.dag.DAG` that already includes these
steps should also be provided..
dag (Optional[:class:`runway.cfngin.dag.DAG`]): An optional
:class:`runway.cfngin.dag.DAG` object. If one is not provided,
a new one will be initialized.
"""
self.steps = steps or {}
self.dag = dag or DAG()
[docs] def add_step(self, step, add_dependencies=False, add_dependants=False):
"""Add a step to the graph.
Args:
step (:class:`Step`): The step to be added.
add_dependencies (bool): Connect steps that need to be completed
before this step.
add_dependants (bool): Connect steps that require this step.
"""
self.steps[step.name] = step
self.dag.add_node(step.name)
if add_dependencies:
for dep in step.requires:
self.connect(step.name, dep)
if add_dependants:
for parent in step.required_by:
self.connect(parent, step.name)
[docs] def add_step_if_not_exists(
self, step, add_dependencies=False, add_dependants=False
):
"""Try to add a step to the graph.
Can be used when failure to add is acceptable.
Args:
step (:class:`Step`): The step to be added.
add_dependencies (bool): Connect steps that need to be completed
before this step.
add_dependants (bool): Connect steps that require this step.
"""
if self.steps.get(step.name):
return
self.steps[step.name] = step
self.dag.add_node_if_not_exists(step.name)
if add_dependencies:
for dep in step.requires:
try:
self.connect(step.name, dep)
except GraphError:
continue
if add_dependants:
for parent in step.required_by:
try:
self.connect(parent, step.name)
except GraphError:
continue
[docs] def add_steps(self, steps):
"""Add a list of steps.
Args:
steps (List[:class:`Step`]): The step to be added.
"""
for step in steps:
self.add_step(step)
for step in steps:
for dep in step.requires:
self.connect(step.name, dep)
for parent in step.required_by:
self.connect(parent, step.name)
[docs] def pop(self, step, default=None):
"""Remove a step from the graph.
Args:
step (:class:`Step`): The step to remove from the graph.
default (Any): Returned if the step could not be popped
Returns:
Any
"""
self.dag.delete_node_if_exists(step.name)
return self.steps.pop(step.name, default)
[docs] def connect(self, step, dep):
"""Connect a dependency to a step.
Args:
step (str): Step name to add a dependency to.
dep (str): Name of dependent step.
"""
try:
self.dag.add_edge(step, dep)
except KeyError as err:
raise GraphError(err, step, dep)
except DAGValidationError as err:
raise GraphError(err, step, dep)
[docs] def transitive_reduction(self):
"""Perform a transitive reduction on the underlying DAG.
The transitive reduction of a graph is a graph with as few edges as
possible with the same reachability as the original graph.
See https://en.wikipedia.org/wiki/Transitive_reduction
"""
self.dag.transitive_reduction()
[docs] def walk(self, walker, walk_func):
"""Walk the steps of the graph.
Args:
walker (Callable[[:class:`runway.cfngin.dag.DAG`], Any]): Function
used to walk the steps.
walk_func (Callable[[:class:`Step`], Any]): Function called with a
:class:`Step` as the only argument for each step of the plan.
"""
def fn(step_name):
"""Get a step by step name and execute the ``walk_func`` on it.
Args:
step_name (str): Name of a step.
"""
step = self.steps[step_name]
return walk_func(step)
return walker(self.dag, fn)
[docs] def downstream(self, step_name):
"""Return the direct dependencies of the given step."""
return list(self.steps[dep] for dep in self.dag.downstream(step_name))
[docs] def transposed(self):
"""Return a "transposed" version of this graph.
Useful for walking in reverse.
"""
return Graph(steps=self.steps, dag=self.dag.transpose())
[docs] def filtered(self, step_names):
"""Return a "filtered" version of this graph.
Args:
step_names (List[str]): Steps to filter.
"""
return Graph(steps=self.steps, dag=self.dag.filter(step_names))
[docs] def topological_sort(self):
"""Perform a topological sort of the underlying DAG.
Returns:
List[Step]
"""
nodes = self.dag.topological_sort()
return [self.steps[step_name] for step_name in nodes]
[docs] def to_dict(self):
"""Return the underlying DAG as a dictionary."""
return self.dag.graph
[docs] def dumps(self, indent=None):
"""Output the graph as a json seralized string for storage.
Args:
indent (Optional[int]): Number of spaces for each indentation.
Returns:
str
"""
return json.dumps(self.to_dict(), default=json_serial, indent=indent)
[docs] @classmethod
def from_dict(cls, graph_dict, context):
"""Create a Graph from a graph dict.
Args:
graph_dict (Dict[str, List[str]]): The dictionary used to
create the graph.
context (:class:`runway.cfngin.context.Context`): Required to init
stacks.
Returns:
:class:`Graph`
"""
return cls.from_steps(Step.from_persistent_graph(graph_dict, context))
[docs] @classmethod
def from_steps(cls, steps):
"""Create a Graph from Steps.
Args:
steps (List[:class:`Step`]): Steps used to create the graph.
Returns:
:class:`Graph`
"""
graph = cls()
graph.add_steps(steps)
return graph
def __str__(self):
"""Object displayed as a string."""
return self.dumps()
[docs]class Plan(object):
"""A convenience class for working on a Graph.
Attributes:
context (:class:`runway.cfngin.context.Context`): Context object.
description (str): Plan description.
graph (Graph): Graph of the plan.
id (str): UUID for the plan.
reverse (bool): The graph has been transposed for walking in reverse.
require_unlocked (bool): Require the persistent graph to be unlocked
before executing steps.
"""
def __init__(
self, description, graph, context=None, reverse=False, require_unlocked=True
):
"""Initialize class.
Args:
description (str): Description of what the plan is going to do.
graph (:class:`Graph`): Local graph used for the plan.
context (:class:`runway.cfngin.context.Context`): Context object.
reverse (bool): Transpose the graph for walking in reverse.
require_unlocked (bool): Require the persistent graph to be
unlocked before executing steps.
"""
self.context = context
self.description = description
self.id = uuid.uuid4()
self.reverse = reverse
self.require_unlocked = require_unlocked
if self.reverse:
graph = graph.transposed()
if self.context:
self.locked = self.context.persistent_graph_locked
if self.context.stack_names:
nodes = []
for target in self.context.stack_names:
if graph.steps.get(target):
nodes.append(target)
graph = graph.filtered(nodes)
else:
self.locked = False
self.graph = graph
[docs] def outline(self, level=logging.INFO, message=""):
"""Print an outline of the actions the plan is going to take.
The outline will represent the rough ordering of the steps that will be
taken.
Args:
level (Optional[int]): a valid log level that should be used to log
the outline
message (Optional[str]): a message that will be logged to
the user after the outline has been logged.
"""
steps = 1
LOGGER.log(level, 'plan "%s":', self.description)
for step in self.steps:
LOGGER.log(
level,
' - step: %s: target: "%s", action: "%s"',
steps,
step.name,
step.fn.__name__,
)
steps += 1
if message:
LOGGER.log(level, message)
[docs] def dump(self, directory, context, provider=None):
"""Output the rendered blueprint for all stacks in the plan.
Args:
directory (str): Directory where files will be created.
context (:class:`runway.cfngin.context.Contest`): Current CFNgin
context.
provider (:class:`runway.cfngin.providers.aws.default.Provider`):
Provider to use when resolving the blueprints.
"""
LOGGER.info('dumping "%s"...', self.description)
directory = os.path.expanduser(directory)
if not os.path.exists(directory):
os.makedirs(directory)
def walk_func(step):
"""Walk function."""
step.stack.resolve(
context=context, provider=provider,
)
blueprint = step.stack.blueprint
filename = stack_template_key_name(blueprint)
path = os.path.join(directory, filename)
blueprint_dir = os.path.dirname(path)
if not os.path.exists(blueprint_dir):
os.makedirs(blueprint_dir)
LOGGER.info('writing stack "%s" -> %s', step.name, path)
with open(path, "w") as _file:
_file.write(blueprint.rendered)
return True
return self.graph.walk(walk, walk_func)
[docs] def execute(self, *args, **kwargs):
"""Walk each step in the underlying graph.
Raises:
PersistentGraphLocked: Raised if the persistent graph is
locked prior to execution and this session did not lock it.
PlanFailed: Raised if any of the steps fail.
"""
if self.locked and self.require_unlocked:
raise PersistentGraphLocked
self.walk(*args, **kwargs)
failed_steps = [step for step in self.steps if step.status == FAILED]
if failed_steps:
raise PlanFailed(failed_steps)
[docs] def walk(self, walker):
"""Walk each step in the underlying graph, in topological order.
Args:
walker (func): a walker function to be passed to
:class:`runway.cfngin.dag.DAG` to walk the graph.
"""
def walk_func(step):
"""Execute a :class:`Step` wile walking the graph.
Handles updating the persistent graph if one is being used.
Args:
step (:class:`Step`): :class:`Step` to execute.
Returns:
bool
"""
# Before we execute the step, we need to ensure that it's
# transitive dependencies are all in an "ok" state. If not, we
# won't execute this step.
for dep in self.graph.downstream(step.name):
if not dep.ok:
step.set_status(FailedStatus("dependency has failed"))
return step.ok
result = step.run()
if not self.context or not self.context.persistent_graph:
return result
if step.completed or (
step.skipped
and step.status.reason == ("does not exist in cloudformation")
):
if step.fn.__name__ == "_destroy_stack":
self.context.persistent_graph.pop(step)
LOGGER.debug(
"removed step '%s' from the persistent graph", step.name
)
elif step.fn.__name__ == "_launch_stack":
self.context.persistent_graph.add_step_if_not_exists(
step, add_dependencies=True, add_dependants=True
)
LOGGER.debug("added step '%s' to the persistent graph", step.name)
else:
return result
self.context.put_persistent_graph(self.lock_code)
return result
return self.graph.walk(walker, walk_func)
@property
def lock_code(self):
"""Code to lock/unlock the persistent graph.
Returns:
str
"""
return str(self.id)
@property
def steps(self):
"""Return a list of all steps in the plan.
Returns:
List[:class:`Step`]
"""
steps = self.graph.topological_sort()
steps.reverse()
return steps
@property
def step_names(self):
"""Return a list of all step names.
Returns:
List[str]
"""
return [step.name for step in self.steps]
[docs] def keys(self):
"""Return a list of all step names.
Returns:
List[str]
"""
return self.step_names