runway.cfngin.dag package

CFNgin directed acyclic graph (DAG) implementation.

exception runway.cfngin.dag.DAGValidationError[source]

Bases: Exception

Raised when DAG validation fails.

with_traceback()

Exception.with_traceback(tb) – set self.__traceback__ to tb and return self.

class runway.cfngin.dag.DAG[source]

Bases: object

Directed acyclic graph implementation.

Instantiate a new DAG with no nodes or edges.

add_node(node_name: str)None[source]

Add a node if it does not exist yet, or error out.

Parameters

node_name – The unique name of the node to add.

Raises

KeyError – Raised if a node with the same name already exist in the graph

add_node_if_not_exists(node_name: str)None[source]

Add a node if it does not exist yet, ignoring duplicates.

Parameters

node_name – The name of the node to add.

delete_node(node_name: str)None[source]

Delete this node and all edges referencing it.

Parameters

node_name – The name of the node to delete.

Raises

KeyError – Raised if the node does not exist in the graph.

delete_node_if_exists(node_name: str)None[source]

Delete this node and all edges referencing it.

Ignores any node that is not in the graph, rather than throwing an exception.

Parameters

node_name – The name of the node to delete.

add_edge(ind_node: str, dep_node: str)None[source]

Add an edge (dependency) between the specified nodes.

Parameters
  • ind_node – The independent node to add an edge to.

  • dep_node – The dependent node that has a dependency on the ind_node.

Raises
delete_edge(ind_node: str, dep_node: str)None[source]

Delete an edge from the graph.

Parameters
  • ind_node – The independent node to delete an edge from.

  • dep_node – The dependent node that has a dependency on the ind_node.

Raises

KeyError – Raised when the edge doesn’t already exist.

transpose()runway.cfngin.dag.DAG[source]

Build a new graph with the edges reversed.

walk(walk_func: Callable[[str], typing.Any])None[source]

Walk each node of the graph in reverse topological order.

This can be used to perform a set of operations, where the next operation depends on the previous operation. It’s important to note that walking happens serially, and is not parallelized.

Parameters

walk_func – The function to be called on each node of the graph.

transitive_reduction()None[source]

Perform a transitive reduction on the DAG.

The transitive reduction of a graph is a graph with as few edges as possible with the same reachability as the original graph.

See https://en.wikipedia.org/wiki/Transitive_reduction

rename_edges(old_node_name: str, new_node_name: str)None[source]

Change references to a node in existing edges.

Parameters
  • old_node_name – The old name for the node.

  • new_node_name – The new name for the node.

predecessors(node: str)List[str][source]

Return a list of all immediate predecessors of the given node.

Parameters

node (str) – The node whose predecessors you want to find.

Returns

A list of nodes that are immediate predecessors to node.

Return type

List[str]

downstream(node: str)List[str][source]

Return a list of all nodes this node has edges towards.

Parameters

node – The node whose downstream nodes you want to find.

Returns

A list of nodes that are immediately downstream from the node.

all_downstreams(node: str)List[str][source]

Return a list of all nodes downstream in topological order.

Parameters

node – The node whose downstream nodes you want to find.

Returns

A list of nodes that are downstream from the node.

filter(nodes: List[str])DAG[source]

Return a new DAG with only the given nodes and their dependencies.

Parameters

nodes – The nodes you are interested in.

all_leaves()List[str][source]

Return a list of all leaves (nodes with no downstreams).

from_dict(graph_dict: Dict[str, Union[Iterable[str], Any]])None[source]

Reset the graph and build it from the passed dictionary.

The dictionary takes the form of {node_name: [directed edges]}

Parameters

graph_dict – The dictionary used to create the graph.

Raises

TypeError – Raised if the value of items in the dict are not lists.

reset_graph()None[source]

Restore the graph to an empty state.

ind_nodes()List[str][source]

Return a list of all nodes in the graph with no dependencies.

validate()Tuple[bool, str][source]

Return (Boolean, message) of whether DAG is valid.

topological_sort()List[str][source]

Return a topological ordering of the DAG.

Raises

ValueError – Raised if the graph is not acyclic.

size()int[source]

Count of nodes in the graph.

__len__()int[source]

How the length of a DAG is calculated.

runway.cfngin.dag.walk(dag: runway.cfngin.dag.DAG, walk_func: Callable[[str], typing.Any])None[source]

Walk a DAG.

class runway.cfngin.dag.UnlimitedSemaphore[source]

Bases: object

threading.Semaphore, but acquire always succeeds.

acquire(*args: typing.Any)typing.Any[source]

Do nothing.

release()typing.Any[source]

Do nothing.

class runway.cfngin.dag.ThreadedWalker(semaphore: Union[threading.Semaphore, UnlimitedSemaphore])[source]

Bases: object

Walk a DAG as quickly as the graph topology allows, using threads.

Instantiate class.

Parameters

semaphore – A semaphore object which can be used to control how many steps are executed in parallel.

walk(dag: runway.cfngin.dag.DAG, walk_func: Callable[[str], typing.Any])None[source]

Walk each node of the graph, in parallel if it can.

The walk_func is only called when the nodes dependencies have been satisfied.