runway.cfngin.dag package

CFNgin directed acyclic graph (DAG) implementation.

class runway.cfngin.dag.DAG[source]

Bases: object

Directed acyclic graph implementation.

Instantiate a new DAG with no nodes or edges.

add_edge(ind_node, dep_node)[source]

Add an edge (dependency) between the specified nodes.

Parameters
  • ind_node (str) – The independent node to add an edge to.

  • dep_node (str) – The dependent node that has a dependency on the ind_node.

Raises
add_node(node_name)[source]

Add a node if it does not exist yet, or error out.

Parameters

node_name (str) – The unique name of the node to add.

Raises

KeyError – Raised if a node with the same name already exist in the graph

add_node_if_not_exists(node_name)[source]

Add a node if it does not exist yet, ignoring duplicates.

Parameters

node_name (str) – The name of the node to add.

all_downstreams(node)[source]

Return a list of all nodes downstream in topological order.

Parameters

node (str) – The node whose downstream nodes you want to find.

Returns

A list of nodes that are downstream from the node.

Return type

List[str]

all_leaves()[source]

Return a list of all leaves (nodes with no downstreams).

Returns

A list of all the nodes with no downstreams.

Return type

List[str]

delete_edge(ind_node, dep_node)[source]

Delete an edge from the graph.

Parameters
  • ind_node (str) – The independent node to delete an edge from.

  • dep_node (str) – The dependent node that has a dependency on the ind_node.

Raises

KeyError – Raised when the edge doesn’t already exist.

delete_node(node_name)[source]

Delete this node and all edges referencing it.

Parameters

node_name (str) – The name of the node to delete.

Raises

KeyError – Raised if the node does not exist in the graph.

delete_node_if_exists(node_name)[source]

Delete this node and all edges referencing it.

Ignores any node that is not in the graph, rather than throwing an exception.

Parameters

node_name (str) – The name of the node to delete.

downstream(node)[source]

Return a list of all nodes this node has edges towards.

Parameters

node (str) – The node whose downstream nodes you want to find.

Returns

A list of nodes that are immediately downstream from the node.

Return type

List[str]

filter(nodes)[source]

Return a new DAG with only the given nodes and their dependencies.

Parameters

nodes (list) – The nodes you are interested in.

Returns

The filtered graph.

Return type

DAG

from_dict(graph_dict)[source]

Reset the graph and build it from the passed dictionary.

The dictionary takes the form of {node_name: [directed edges]}

Parameters

graph_dict (Dict[str, Any]) – The dictionary used to create the graph.

Raises

TypeError – Raised if the value of items in the dict are not lists.

ind_nodes()[source]

Return a list of all nodes in the graph with no dependencies.

Returns

A list of all independent nodes.

Return type

List[str]

predecessors(node)[source]

Return a list of all immediate predecessors of the given node.

Parameters

node (str) – The node whose predecessors you want to find.

Returns

A list of nodes that are immediate predecessors to node.

Return type

List[str]

rename_edges(old_node_name, new_node_name)[source]

Change references to a node in existing edges.

Parameters
  • old_node_name (str) – The old name for the node.

  • new_node_name (str) – The new name for the node.

reset_graph()[source]

Restore the graph to an empty state.

size()[source]

Count of nodes in the graph.

topological_sort()[source]

Return a topological ordering of the DAG.

Returns

A list of topologically sorted nodes in the graph.

Return type

list

Raises

ValueError – Raised if the graph is not acyclic.

transitive_reduction()[source]

Perform a transitive reduction on the DAG.

The transitive reduction of a graph is a graph with as few edges as possible with the same reachability as the original graph.

See https://en.wikipedia.org/wiki/Transitive_reduction

transpose()[source]

Build a new graph with the edges reversed.

Returns

The transposed graph.

Return type

runway.cfngin.dag.DAG

validate()[source]

Return (Boolean, message) of whether DAG is valid.

Returns

Tuple[bool, str]

walk(walk_func)[source]

Walk each node of the graph in reverse topological order.

This can be used to perform a set of operations, where the next operation depends on the previous operation. It’s important to note that walking happens serially, and is not parallelized.

Parameters

walk_func (types.FunctionType) – The function to be called on each node of the graph.

exception runway.cfngin.dag.DAGValidationError[source]

Bases: Exception

Raised when DAG validation fails.

class runway.cfngin.dag.ThreadedWalker(semaphore)[source]

Bases: object

Walk a DAG as quickly as the graph topology allows, using threads.

Instantiate class.

Parameters

semaphore (threading.Semaphore) – a semaphore object which can be used to control how many steps are executed in parallel.

walk(dag, walk_func)[source]

Walk each node of the graph, in parallel if it can.

The walk_func is only called when the nodes dependencies have been satisfied.

class runway.cfngin.dag.UnlimitedSemaphore[source]

Bases: object

threading.Semaphore, but acquire always succeeds.

acquire(*args)[source]

Do nothing.

release()[source]

Do nothing.

runway.cfngin.dag.walk(dag, walk_func)[source]

Walk a DAG.