"""CFNgin graph action."""
from __future__ import annotations
import json
import logging
import sys
from typing import TYPE_CHECKING, Any, Iterable, List, TextIO, Tuple, Union
from ..plan import merge_graphs
from .base import BaseAction
if TYPE_CHECKING:
from ..plan import Graph, Step
LOGGER = logging.getLogger(__name__)
[docs]def each_step(graph: Graph) -> Iterable[Tuple[Step, List[Step]]]:
"""Yield each step and it's direct dependencies.
Args:
graph: Graph to iterate over.
"""
steps = graph.topological_sort()
steps.reverse()
for step in steps:
deps = graph.downstream(step.name)
yield step, deps
FORMATTERS = {
"dot": dot_format,
"json": json_format,
}
[docs]class Action(BaseAction):
"""Responsible for outputting a graph for the current CFNgin config."""
DESCRIPTION = "Print graph"
NAME = "graph"
@property
def _stack_action(self) -> Any:
"""Run against a step."""
return None
[docs] def run(
self,
*,
concurrency: int = 0, # pylint: disable=unused-argument
dump: Union[bool, str] = False, # pylint: disable=unused-argument
force: bool = False, # pylint: disable=unused-argument
outline: bool = False, # pylint: disable=unused-argument
tail: bool = False, # pylint: disable=unused-argument
upload_disabled: bool = False, # pylint: disable=unused-argument
**kwargs: Any,
) -> None:
"""Generate the underlying graph and prints it."""
graph = self._generate_plan(
require_unlocked=False, include_persistent_graph=True
).graph
if self.context.persistent_graph:
graph = merge_graphs(self.context.persistent_graph, graph)
if kwargs.get("reduce"):
# This will perform a a transitive reduction on the underlying
# graph, producing less edges. Mostly useful for the "dot" format,
# when converting to PNG, so it creates a prettier/cleaner
# dependency graph.
graph.transitive_reduction()
fn = FORMATTERS[str(kwargs.get("format", "json"))]
fn(sys.stdout, graph)
sys.stdout.flush()