runway.cfngin.dag package
CFNgin directed acyclic graph (DAG) implementation.
- exception runway.cfngin.dag.DAGValidationError[source]
Bases:
Exception
Raised when DAG validation fails.
- __init__(*args, **kwargs)
- __new__(**kwargs)
- add_note()
Exception.add_note(note) – add a note to the exception
- with_traceback()
Exception.with_traceback(tb) – set self.__traceback__ to tb and return self.
- class runway.cfngin.dag.DAG[source]
Bases:
object
Directed acyclic graph implementation.
- add_node(node_name: str) None [source]
Add a node if it does not exist yet, or error out.
- Parameters
node_name – The unique name of the node to add.
- Raises
KeyError – Raised if a node with the same name already exist in the graph
- add_node_if_not_exists(node_name: str) None [source]
Add a node if it does not exist yet, ignoring duplicates.
- Parameters
node_name – The name of the node to add.
- delete_node(node_name: str) None [source]
Delete this node and all edges referencing it.
- Parameters
node_name – The name of the node to delete.
- Raises
KeyError – Raised if the node does not exist in the graph.
- delete_node_if_exists(node_name: str) None [source]
Delete this node and all edges referencing it.
Ignores any node that is not in the graph, rather than throwing an exception.
- Parameters
node_name – The name of the node to delete.
- add_edge(ind_node: str, dep_node: str) None [source]
Add an edge (dependency) between the specified nodes.
- Parameters
ind_node – The independent node to add an edge to.
dep_node – The dependent node that has a dependency on the ind_node.
- Raises
KeyError – Either the ind_node, or dep_node do not exist.
DAGValidationError – Raised if the resulting graph is invalid.
- delete_edge(ind_node: str, dep_node: str) None [source]
Delete an edge from the graph.
- Parameters
ind_node – The independent node to delete an edge from.
dep_node – The dependent node that has a dependency on the ind_node.
- Raises
KeyError – Raised when the edge doesn’t already exist.
- transpose() runway.cfngin.dag.DAG [source]
Build a new graph with the edges reversed.
- walk(walk_func: Callable[[str], Any]) None [source]
Walk each node of the graph in reverse topological order.
This can be used to perform a set of operations, where the next operation depends on the previous operation. It’s important to note that walking happens serially, and is not parallelized.
- Parameters
walk_func – The function to be called on each node of the graph.
- transitive_reduction() None [source]
Perform a transitive reduction on the DAG.
The transitive reduction of a graph is a graph with as few edges as possible with the same reachability as the original graph.
- rename_edges(old_node_name: str, new_node_name: str) None [source]
Change references to a node in existing edges.
- Parameters
old_node_name – The old name for the node.
new_node_name – The new name for the node.
- predecessors(node: str) List[str] [source]
Return a list of all immediate predecessors of the given node.
- Parameters
node (str) – The node whose predecessors you want to find.
- Returns
A list of nodes that are immediate predecessors to node.
- Return type
List[str]
- downstream(node: str) List[str] [source]
Return a list of all nodes this node has edges towards.
- Parameters
node – The node whose downstream nodes you want to find.
- Returns
A list of nodes that are immediately downstream from the node.
- all_downstreams(node: str) List[str] [source]
Return a list of all nodes downstream in topological order.
- Parameters
node – The node whose downstream nodes you want to find.
- Returns
A list of nodes that are downstream from the node.
- filter(nodes: List[str]) runway.cfngin.dag.DAG [source]
Return a new DAG with only the given nodes and their dependencies.
- Parameters
nodes – The nodes you are interested in.
- from_dict(graph_dict: Dict[str, Union[Iterable[str], Any]]) None [source]
Reset the graph and build it from the passed dictionary.
The dictionary takes the form of {node_name: [directed edges]}
- Parameters
graph_dict – The dictionary used to create the graph.
- Raises
TypeError – Raised if the value of items in the dict are not lists.
- topological_sort() List[str] [source]
Return a topological ordering of the DAG.
- Raises
ValueError – Raised if the graph is not acyclic.
- __new__(**kwargs)
- runway.cfngin.dag.walk(dag: runway.cfngin.dag.DAG, walk_func: Callable[[str], Any]) None [source]
Walk a DAG.
- class runway.cfngin.dag.UnlimitedSemaphore[source]
Bases:
object
threading.Semaphore, but acquire always succeeds.
- __init__()
- __new__(**kwargs)
- class runway.cfngin.dag.ThreadedWalker[source]
Bases:
object
Walk a DAG as quickly as the graph topology allows, using threads.
- __init__(semaphore: Union[threading.Semaphore, UnlimitedSemaphore])[source]
Instantiate class.
- Parameters
semaphore – A semaphore object which can be used to control how many steps are executed in parallel.
- __new__(**kwargs)