Skip to content

Directed graphs with global source and global sink

This class is used in AbstractWalkModelDiGraph as a wrapper (with unique global source and unique global sink) to pass the directed graph to the ILP models.

stDiGraph

stDiGraph(
    base_graph: DiGraph,
    additional_starts: (
        list | None
    ) = None,
    additional_ends: (
        list | None
    ) = None,
)

Bases: AbstractSourceSinkGraph

General directed graph with global source/sink and SCC condensation helpers.

This class now subclasses AbstractSourceSinkGraph, which performs the common augmentation (adding global source/sink and validating additional boundary nodes). The remaining logic here focuses on strongly connected component (SCC) handling and the condensation expansion used for width and incompatible sequence computations.

This class inherits from networkx.DiGraph. The graph equals base_graph plus:

  • a global source connected to all sources of base_graph and to all nodes in additional_starts;
  • a global sink connected from all sinks of base_graph and from all nodes in additional_ends.

Warning

The graph base_graph must satisfy the following properties:

  • the nodes must be strings;
  • base_graph must have at least one source (i.e. node without incoming edges), or at least one node in additional_starts;
  • base_graph must have at least one sink (i.e. node without outgoing edges), or at least one node in additional_ends.

Raises:

  • ValueError: If any of the above three conditions are not satisfied.
  • ValueError: If any node in additional_starts is not in the base graph.
  • ValueError: If any node in additional_ends is not in the base graph.
Source code in flowpaths/stdigraph.py
def __init__(
    self,
    base_graph: nx.DiGraph,
    additional_starts: list | None = None,
    additional_ends: list | None = None,
):
    """
    This class inherits from networkx.DiGraph. The graph equals `base_graph` plus:

    - a global source connected to all sources of `base_graph` and to all nodes in `additional_starts`;
    - a global sink connected from all sinks of `base_graph` and from all nodes in `additional_ends`.

    !!! warning Warning

        The graph `base_graph` must satisfy the following properties:

        - the nodes must be strings; 
        - `base_graph` must have at least one source (i.e. node without incoming edges), or at least one node in `additional_starts`;
        - `base_graph` must have at least one sink (i.e. node without outgoing edges), or at least one node in `additional_ends`.

    Raises:
    -------
    - `ValueError`: If any of the above three conditions are not satisfied.
    - `ValueError`: If any node in `additional_starts` is not in the base graph.
    - `ValueError`: If any node in `additional_ends` is not in the base graph.

    """
    self._condensation_expanded = None
    super().__init__(
        base_graph=base_graph,
        additional_starts=additional_starts,
        additional_ends=additional_ends,
    )

compute_edge_max_reachable_value

compute_edge_max_reachable_value(
    flow_attr: str,
) -> Dict[
    Tuple[str, str], float
]

For each base edge (u,v), compute the maximum flow_attr over: - the edge (u,v) itself, - any edge reachable forward from v, - any edge whose head can reach u (i.e., backward reachability to u).

Efficiently uses the precomputed SCC condensation and runs dynamic programming on the condensation DAG.

Returns a dict mapping each original edge (u,v) to the computed float.

If an edge has a missing ``flow_attr’’ (the source and sink edges) we treat its flow value as 0.

Examples

import networkx as nx from flowpaths.stdigraph import stDiGraph G = nx.DiGraph() G.add_edge(“a”, “b”, flow=1) G.add_edge(“b”, “c”, flow=5) G.add_edge(“c”, “a”, flow=3) # cycle among a,b,c G.add_edge(“c”, “d”, flow=2) H = stDiGraph(G) res = H.compute_edge_max_reachable_value(“flow”)

Every edge can reach an edge of weight 5 within the SCC or forward

res[(“a”, “b”)] == 5 and res[(“b”, “c”)] == 5 and res[(“c”, “a”)] == 5 and res[(“c”, “d”)] == 5 True

Source code in flowpaths/stdigraph.py
def compute_edge_max_reachable_value(self, flow_attr: str) -> Dict[Tuple[str, str], float]:
    """For each base edge (u,v), compute the maximum ``flow_attr`` over:
    - the edge (u,v) itself,
    - any edge reachable forward from v,
    - any edge whose head can reach u (i.e., backward reachability to u).

    Efficiently uses the precomputed SCC condensation and runs dynamic programming on
    the condensation DAG.

    Returns a dict mapping each original edge (u,v) to the computed float. 

    If an edge has a missing ``flow_attr'' (the source and sink edges) we treat its flow value as 0.

    Examples
    --------
    >>> import networkx as nx
    >>> from flowpaths.stdigraph import stDiGraph
    >>> G = nx.DiGraph()
    >>> G.add_edge("a", "b", flow=1)
    >>> G.add_edge("b", "c", flow=5)
    >>> G.add_edge("c", "a", flow=3)  # cycle among a,b,c
    >>> G.add_edge("c", "d", flow=2)
    >>> H = stDiGraph(G)
    >>> res = H.compute_edge_max_reachable_value("flow")
    >>> # Every edge can reach an edge of weight 5 within the SCC or forward
    >>> res[("a", "b")] == 5 and res[("b", "c")] == 5 and res[("c", "a")] == 5 and res[("c", "d")] == 5
    True
    """
    C: nx.DiGraph = self._condensation
    mapping = C.graph["mapping"]  # original node -> condensation node (int)

    # 2) Precompute per-SCC local maxima and per-edge weights.
    #
    #    - local_out[c]: the max weight of any edge whose TAIL is inside SCC `c`.
    #      This represents the best edge you can reach by going forward from any
    #      node in this SCC (including edges that stay inside the SCC or that exit it).
    #
    #    - local_in[c]: the max weight of any edge whose HEAD is inside SCC `c`.
    #      This captures the best edge that can reach this SCC (used for backward reachability).
    #
    #    We also record each edge's own weight so the final answer includes (u,v) itself.
    local_out = {c: 0.0 for c in C.nodes()}
    local_in = {c: 0.0 for c in C.nodes()}

    edge_weight: Dict[Tuple[str, str], float] = {}
    for u, v, data in self.edges(data=True):
        w = float(data.get(flow_attr, 0.0))
        edge_weight[(u, v)] = w
        cu = mapping[u]
        cv = mapping[v]
        if w > local_out[cu]:
            local_out[cu] = w
        if w > local_in[cv]:
            local_in[cv] = w

    topological_sort = list(nx.topological_sort(C))

    # 3) Forward DP over the condensation DAG to compute, for each SCC `c`, the
    #    maximum edge weight reachable from `c` by moving forward along DAG edges.
    #    Because SCCs are contracted, this also correctly accounts for reachability
    #    within cycles (inside a single SCC).
    max_desc = {c: local_out[c] for c in C.nodes()}
    for c in reversed(topological_sort):
        for s in C.successors(c):
            if max_desc[s] > max_desc[c]:
                max_desc[c] = max_desc[s]

    # 4) Backward DP (propagated forward along edges) to compute, for each SCC `c`, the
    #    maximum edge weight among edges whose HEAD can reach `c` (i.e. along the
    #    reversed condensation DAG). We propagate `local_in` from ancestors to successors
    #    in topological order.
    max_anc = {c: local_in[c] for c in C.nodes()}
    for c in topological_sort:
        for s in C.successors(c):
            if max_anc[c] > max_anc[s]:
                max_anc[s] = max_anc[c]

    # 5) For each original edge (u,v), combine:
    #    - the edge's own weight,
    #    - the best reachable-from-SCC(v) weight (forward), and
    #    - the best reaching-SCC(u) weight (backward).
    #
    #    If no reachable candidate exists, the value is 0.0 by construction of locals.
    result: Dict[Tuple[str, str], float] = {}
    for u, v in self.edges():
        cu = mapping[u]
        cv = mapping[v]
        result[(u, v)] = max(edge_weight[(u, v)], max_desc[cv], max_anc[cu])

    return result

get_avg_size_of_non_trivial_SCC

get_avg_size_of_non_trivial_SCC() -> (
    int
)

Returns the average size (in terms of number of edges) of non-trivial SCCs (i.e. SCCs with at least one edge).

Source code in flowpaths/stdigraph.py
def get_avg_size_of_non_trivial_SCC(self) -> int:
    """
    Returns the average size (in terms of number of edges) of non-trivial SCCs (i.e. SCCs with at least one edge).
    """
    sizes = [len(self._condensation.graph['member_edges'][str(v)]) for v in self._condensation.nodes() if len(self._condensation.graph['member_edges'][str(v)]) > 0]
    return sum(sizes) // len(sizes) if sizes else 0

get_number_of_nontrivial_SCCs

get_number_of_nontrivial_SCCs() -> (
    int
)

Returns the number of non-trivial SCCs (i.e. SCCs with at least one edge).

Source code in flowpaths/stdigraph.py
def get_number_of_nontrivial_SCCs(self) -> int:
    """
    Returns the number of non-trivial SCCs (i.e. SCCs with at least one edge).
    """

    return sum(1 for v in self._condensation.nodes() if len(self._condensation.graph['member_edges'][str(v)]) > 0)

get_size_of_largest_SCC

get_size_of_largest_SCC() -> (
    int
)

Returns the size of the largest SCC (in terms of number of edges).

Source code in flowpaths/stdigraph.py
def get_size_of_largest_SCC(self) -> int:
    """
    Returns the size of the largest SCC (in terms of number of edges).
    """
    return max((len(self._condensation.graph['member_edges'][str(v)]) for v in self._condensation.nodes()), default=0)

get_width

get_width(
    edges_to_ignore: list = None,
) -> int

Returns the width of the graph, which we define as the minimum number of \(s\)-\(t\) walks needed to cover all edges.

This is computed as the width of the condensation DAGs (minimum number of \(s\)-\(t\) paths to cover all edges), with the following modification. Nodes v in the condensation corresponding to non-trivial SCCs (i.e. SCCs with more than one node, equivalent to having at least one edge) are subdivided into a edge (v, v_expanded), all condensation in-neighbors of v are connected to v, and all condensation out-neighbors of v are connected from v_expanded.

Parameters:
  • edges_to_ignore: A list of edges in the original graph to ignore when computing the width.

    The width is then computed as as above, with the exception that:

    • If an edge (u,v) in edges_to_ignore is between different SCCs, then the corresponding edge to ignore is between the two SCCs in the condensation graph, and we can ignore it when computing the normal width of the condensation.

    • If an edge (u,v) in edges_to_ignore is inside the same SCC, then we remove the edge (u,v) from (a copy of) the member edges of the SCC in the condensation. If an SCC v has no more member edges left, we can also add the condensation edge (v, v_expanded) to the list of edges to ignore when computing the width of the condensation.

Source code in flowpaths/stdigraph.py
def get_width(self, edges_to_ignore: list = None) -> int:
    """
    Returns the width of the graph, which we define as the minimum number of $s$-$t$ walks needed to cover all edges.

    This is computed as the width of the condensation DAGs (minimum number of $s$-$t$ paths to cover all edges), with the following modification.
    Nodes `v` in the condensation corresponding to non-trivial SCCs (i.e. SCCs with more than one node, equivalent to having at least one edge) 
    are subdivided into a edge `(v, v_expanded)`, all condensation in-neighbors of `v` are connected to `v`,
    and all condensation out-neighbors of `v` are connected from `v_expanded`.

    Parameters:
    -----------
    - `edges_to_ignore`: A list of edges in the original graph to ignore when computing the width.

        The width is then computed as as above, with the exception that:

        - If an edge `(u,v)` in `edges_to_ignore` is between different SCCs, 
            then the corresponding edge to ignore is between the two SCCs in the condensation graph, 
            and we can ignore it when computing the normal width of the condensation.

        - If an edge `(u,v)` in `edges_to_ignore` is inside the same SCC, 
            then we remove the edge `(u,v)` from (a copy of) the member edges of the SCC in the condensation. 
            If an SCC `v` has no more member edges left, we can also add the condensation edge `(v, v_expanded)` to
            the list of edges to ignore when computing the width of the condensation.
    """

    if self.condensation_width is not None and (edges_to_ignore is None or len(edges_to_ignore) == 0):
        return self.condensation_width

    # We transform each edge in edges_to_ignore (which are edges of self)
    # into an edge in the expanded graph
    edges_to_ignore_expanded = []
    member_edges = copy.deepcopy(self._condensation.graph['member_edges'])
    edge_multiplicity = copy.deepcopy(self._condensation.graph["edge_multiplicity"])
    utils.logger.debug(f"{__name__}: edge_multiplicity for edges in the condensation: {edge_multiplicity}")

    for u, v in (edges_to_ignore or []):
        # If (u,v) is an edge between different SCCs
        # Then the corresponding edge to ignore is between the two SCCs
        if not self.is_scc_edge(u, v):
            edge_multiplicity[self._edge_to_condensation_edge(u, v)] -= 1
        else:
            # (u,v) is an edge within the same SCC
            # and thus we remove the edge (u,v) from the member edges
            member_edges[self._edge_to_condensation_node(u, v)].discard((u, v))

    weight_function_condensation_expanded = {e: 0 for e in self._condensation_expanded.edges()}

    for u,v in self._condensation.edges:
        weight_function_condensation_expanded[self._condensation_edge_to_condensation_expanded_edge(u,v)] = edge_multiplicity[(u,v)]

    # We also add to edges_to_ignore_expanded the expanded edges arising from non-trivial SCCs
    # (i.e. SCCs with more than one node, which are expanded into an edge, 
    # i.e. len(self._condensation['member_edges'][node]) > 0)
    # and for which there are no longer member edges (because all were in edges_to_ignore)
    for node in self._condensation.nodes():
        if len(member_edges[str(node)]) == 0 and len(self._condensation.graph['member_edges'][str(node)]) > 0:
            weight_function_condensation_expanded[(str(node), self._expanded(node))] = 0
        else:
            weight_function_condensation_expanded[(str(node), self._expanded(node))] = 1

    utils.logger.debug(f"{__name__}: Edges to ignore in the expanded graph: {edges_to_ignore_expanded}")

    utils.logger.debug(f"{__name__}: Condensation expanded graph: {self._condensation_expanded.edges()}")
    # width = self._condensation_expanded.get_width(edges_to_ignore=edges_to_ignore_expanded)

    width = self._condensation_expanded.compute_max_edge_antichain(get_antichain=False, weight_function=weight_function_condensation_expanded)

    utils.logger.debug(f"{__name__}: Width of the condensation expanded graph: {width}")

    if (edges_to_ignore is None or len(edges_to_ignore) == 0):
        self.condensation_width = width

    # DEBUG code
    # utils.draw(
    #         G=self._condensation_expanded,
    #         filename="condensation_expanded.pdf",
    #         flow_attr="flow",
    #         draw_options={
    #         "show_graph_edges": True,
    #         "show_edge_weights": False,
    #         "show_path_weights": False,
    #         "show_path_weight_on_first_edge": True,
    #         "pathwidth": 2,
    #     })

    return width

is_scc_edge

is_scc_edge(u, v) -> bool

Returns True if (u,v) is an edge inside an SCC of self, False otherwise.

Source code in flowpaths/stdigraph.py
def is_scc_edge(self, u, v) -> bool:
    """
    Returns True if (u,v) is an edge inside an SCC of self, False otherwise.
    """

    # Check if (u,v) is an edge of the graph
    if (u,v) not in self.edges():
        utils.logger.error(f"{__name__}: Edge ({u},{v}) is not in the graph.")
        raise ValueError(f"Edge ({u},{v}) is not in the graph.")

    return self._condensation.graph['mapping'][u] == self._condensation.graph['mapping'][v]