runway.cfngin.dag package¶
CFNgin directed acyclic graph (DAG) implementation.

exception
runway.cfngin.dag.
DAGValidationError
[source]¶ Bases:
Exception
Raised when DAG validation fails.

with_traceback
()¶ Exception.with_traceback(tb) – set self.__traceback__ to tb and return self.


class
runway.cfngin.dag.
DAG
[source]¶ Bases:
object
Directed acyclic graph implementation.
Instantiate a new DAG with no nodes or edges.

add_node
(node_name: str) → None[source]¶ Add a node if it does not exist yet, or error out.
 Parameters
node_name – The unique name of the node to add.
 Raises
KeyError – Raised if a node with the same name already exist in the graph

add_node_if_not_exists
(node_name: str) → None[source]¶ Add a node if it does not exist yet, ignoring duplicates.
 Parameters
node_name – The name of the node to add.

delete_node
(node_name: str) → None[source]¶ Delete this node and all edges referencing it.
 Parameters
node_name – The name of the node to delete.
 Raises
KeyError – Raised if the node does not exist in the graph.

delete_node_if_exists
(node_name: str) → None[source]¶ Delete this node and all edges referencing it.
Ignores any node that is not in the graph, rather than throwing an exception.
 Parameters
node_name – The name of the node to delete.

add_edge
(ind_node: str, dep_node: str) → None[source]¶ Add an edge (dependency) between the specified nodes.
 Parameters
ind_node – The independent node to add an edge to.
dep_node – The dependent node that has a dependency on the ind_node.
 Raises
KeyError – Either the ind_node, or dep_node do not exist.
DAGValidationError – Raised if the resulting graph is invalid.

delete_edge
(ind_node: str, dep_node: str) → None[source]¶ Delete an edge from the graph.
 Parameters
ind_node – The independent node to delete an edge from.
dep_node – The dependent node that has a dependency on the ind_node.
 Raises
KeyError – Raised when the edge doesn’t already exist.

transpose
() → runway.cfngin.dag.DAG[source]¶ Build a new graph with the edges reversed.

walk
(walk_func: Callable[[str], typing.Any]) → None[source]¶ Walk each node of the graph in reverse topological order.
This can be used to perform a set of operations, where the next operation depends on the previous operation. It’s important to note that walking happens serially, and is not parallelized.
 Parameters
walk_func – The function to be called on each node of the graph.

transitive_reduction
() → None[source]¶ Perform a transitive reduction on the DAG.
The transitive reduction of a graph is a graph with as few edges as possible with the same reachability as the original graph.

rename_edges
(old_node_name: str, new_node_name: str) → None[source]¶ Change references to a node in existing edges.
 Parameters
old_node_name – The old name for the node.
new_node_name – The new name for the node.

predecessors
(node: str) → List[str][source]¶ Return a list of all immediate predecessors of the given node.
 Parameters
node (str) – The node whose predecessors you want to find.
 Returns
A list of nodes that are immediate predecessors to node.
 Return type
List[str]

downstream
(node: str) → List[str][source]¶ Return a list of all nodes this node has edges towards.
 Parameters
node – The node whose downstream nodes you want to find.
 Returns
A list of nodes that are immediately downstream from the node.

all_downstreams
(node: str) → List[str][source]¶ Return a list of all nodes downstream in topological order.
 Parameters
node – The node whose downstream nodes you want to find.
 Returns
A list of nodes that are downstream from the node.

filter
(nodes: List[str]) → DAG[source]¶ Return a new DAG with only the given nodes and their dependencies.
 Parameters
nodes – The nodes you are interested in.

from_dict
(graph_dict: Dict[str, Union[Iterable[str], Any]]) → None[source]¶ Reset the graph and build it from the passed dictionary.
The dictionary takes the form of {node_name: [directed edges]}
 Parameters
graph_dict – The dictionary used to create the graph.
 Raises
TypeError – Raised if the value of items in the dict are not lists.

topological_sort
() → List[str][source]¶ Return a topological ordering of the DAG.
 Raises
ValueError – Raised if the graph is not acyclic.


runway.cfngin.dag.
walk
(dag: runway.cfngin.dag.DAG, walk_func: Callable[[str], typing.Any]) → None[source]¶ Walk a DAG.

class
runway.cfngin.dag.
UnlimitedSemaphore
[source]¶ Bases:
object
threading.Semaphore, but acquire always succeeds.

class
runway.cfngin.dag.
ThreadedWalker
(semaphore: Union[threading.Semaphore, UnlimitedSemaphore])[source]¶ Bases:
object
Walk a DAG as quickly as the graph topology allows, using threads.
Instantiate class.
 Parameters
semaphore – A semaphore object which can be used to control how many steps are executed in parallel.

walk
(dag: runway.cfngin.dag.DAG, walk_func: Callable[[str], typing.Any]) → None[source]¶ Walk each node of the graph, in parallel if it can.
The walk_func is only called when the nodes dependencies have been satisfied.