Skip to content

Minimum Flow Decomposition in General Graphs

1. Definition

The Minimum Flow Decomposition problem on a directed graph (possibly with cycles) is defined as follows. For a walk \(W\) and an edge \((u,v)\), we denote by \(W(u,v)\) the number of times that the walk goes through the edge \((u,v)\). If \(W(u,v)\) does not contain \((u,v)\) , then \(W(u,v) = 0\).

  • INPUT:

    • A directed graph \(G = (V,E)\).
    • Node subsets \(S \subseteq V\) and \(T \subseteq V\), where the walks are allowed to start and allowed to end, respectively.
    • A flow on \(G\), namely weights \(f(u,v)\) for every edge \((u,v)\) of \(G\), such that for every node \(v\) that is not in \(S\) or \(T\), it holds that the sum of the flow values entering \(v\) equals the sum of the flow values exiting \(v\). This property is called flow conservation.
    • Note that for flow conservation to hold, all graph sources (i.e. nodes without incoming edges) must be in \(S\), and all graph sinks (i.e. nodes without outgoing edges) must be in \(T\).
  • OUTPUT: A minimum number \(k\) of walks \(W_1,\dots,W_k\), starting in some node in \(S\) and ending in some node in \(T\), and a weight \(w_i\) associated to each \(W_i\), such that for every edge of the graph it holds that its flow value equals the sum of the weights of the walks going through the edge. Formally, $$ f(u,v) = \sum_{i \in \{1,\dots,k\}}w_i \cdot W_i(u,v), ~~\forall (u,v) \in E. $$

Note

  • This class support also graphs with flow values on nodes. Set the parameter flow_attr_origin = "node". For details on how these are handled internally, see Handling graphs with flows / weights on nodes.
  • The graph may have more than one source or sink nodes, in which case the solution paths are just required to start in any source node, and end in any sink node.

For example, the directed graph below satisfies the flow conservation property, with \(S = \{s\}\) and \(T = \{t\}\):

flowchart LR
    s((s))
    a((a))
    b((b))
    c((c))
    d((d))
    e((e))
    f((f))
    g((g))
    h((h))
    t((t))
    s -->|3| a
    a -->|3| t
    s -->|6| b
    b -->|2| a
    a -->|2| h
    h -->|6| t
    b -->|4| c
    c -->|4| h
    c -->|4| d
    d -->|4| e
    e -->|4| c
    e -->|8| f
    f -->|8| g
    g -->|8| e

A decomposition into 3 walks, in red, orange and blue, of weights 4, 3 and 2, respectively is shown below. There is no decomposition into a smaller number of \(s\)-\(t\) walks, and thus this decomposition is also a minimum flow decomposition.

Note that the orange and blue walks do not repeat nodes or edges (they are paths), but the red walk \(s\), \(b\), \(c\), \(d\), \(e\), \(f\), \(g\), \(e\), \(f\), \(g\), \(e\), \(c\), \(h\), \(t\) is a proper walks in the sense that repeats both nodes and edges.

flowchart LR
    s((s))
    a((a))
    b((b))
    c((c))
    d((d))
    e((e))
    f((f))
    g((g))
    h((h))
    t((t))
    s -->|3| a
    a -->|3| t
    s -->|2| b
    s -->|4| b
    b -->|2| a
    a -->|2| h
    h -->|2| t
    h -->|4| t
    b -->|4| c
    c -->|4| h
    c -->|4| d
    d -->|4| e
    e -->|4| c
    e -->|4| f
    f -->|4| g
    g -->|4| e
    e -->|4| f
    f -->|4| g
    g -->|4| e
    linkStyle 0,1 stroke:orange,stroke-width:3;
    linkStyle 2,4,5,6 stroke:blue,stroke-width:3;
    linkStyle 3,7,8,9,10,11,12,13,14,15,16,17,18 stroke:red,stroke-width:3;

2. Solving the problem

We create the graph as a networkx DiGraph. In real project, you will likely have a method that transforms your graph to a DiGraph. We also give an attribute flow for every edge storing its flow value.

import flowpaths as fp
import networkx as nx

graph = nx.DiGraph()
graph.add_edge("s", "a", flow=3)
graph.add_edge("a", "t", flow=3)
graph.add_edge("s", "b", flow=6)
graph.add_edge("b", "a", flow=2)
graph.add_edge("a", "h", flow=2)
graph.add_edge("h", "t", flow=6)
graph.add_edge("b", "c", flow=4)
graph.add_edge("c", "d", flow=4)
graph.add_edge("c", "h", flow=4)
graph.add_edge("d", "h", flow=0)
graph.add_edge("d", "e", flow=4)
graph.add_edge("e", "c", flow=4)
graph.add_edge("e", "f", flow=8)
graph.add_edge("f", "g", flow=8)
graph.add_edge("g", "e", flow=8)
We now create a Minimum Flow Decomposition solver with default settings, by specifying that the flow value of each edge is in the attribute flow of the edges. Note that MinFlowDecompCycles just creates the model. You need to call solve() to solve it.

mfd_model = fp.MinFlowDecompCycles(G=graph, flow_attr="flow")
mfd_model.solve()

The model might not be solved because the MILP solver couldn’t do it in the time it had allocated, or other problems. Thus, you need to check if it was solved, and then get its solution. The solution of MinFlowDecompCycles is a dictionary, with a key 'walks', and a key 'weights':

if mfd_model.is_solved():
    solution = mfd_model.get_solution()
    print(solution)
    # {'walks': [
    #   ['s', 'b', 'c', 'd', 'e', 'f', 'g', 'e', 'f', 'g', 'e', 'c', 'h', 't'], 
    #   ['s', 'a', 't'], 
    #   ['s', 'b', 'a', 'h', 't']]
    # 'weights': [4, 3, 2]}

3. References

There are several works on this problem, for example.

  1. Vatinlen, Benedicte, et al. Simple bounds and greedy algorithms for decomposing a flow into a minimal set of paths. European Journal of Operational Research 185.3 (2008): 1390-1401.

  2. Fernando H. C. Dias, Lucia Williams, Brendan Mumey, Alexandru I. Tomescu Minimum Flow Decomposition in Graphs with Cycles using Integer Linear Programming, arXiv, 2022

  3. See also flowpaths References, and the other papers cited by these works.

MinFlowDecompCycles

MinFlowDecompCycles(
    G: DiGraph,
    flow_attr: str,
    flow_attr_origin: str = "edge",
    weight_type: type = int,
    subset_constraints: list = [],
    subset_constraints_coverage: float = 1.0,
    elements_to_ignore: list = [],
    additional_starts: list = [],
    additional_ends: list = [],
    optimization_options: dict = {},
    solver_options: dict = {},
)

Bases: AbstractWalkModelDiGraph

A class to decompose a network flow if a general directed graph into a minimum number of weighted s-t walks.

Parameters

  • G : nx.DiGraph

    The input directed graph, as networkx DiGraph, possibly with cycles.

  • flow_attr : str

    The attribute name from where to get the flow values on the edges.

  • flow_attr_origin : str, optional

    The origin of the flow attribute. Default is "edge". Options:

    • "edge": the flow attribute is assumed to be on the edges of the graph.
    • "node": the flow attribute is assumed to be on the nodes of the graph. See the documentation on how node-weighted graphs are handled.
  • weight_type : type, optional

    The type of weights (int or float). Default is int.

  • subset_constraints : list, optional

    List of subset constraints. Default is an empty list. Each subset constraint is a list of edges that must be covered by some solution walks, in any order, according to the subset_constraints_coverage parameter (see below).

  • subset_constraints_coverage: float, optional

    Coverage fraction of the subset constraints that must be covered by some solution walk.

    Defaults to 1.0, meaning that 100% of the edges (or nodes, if flow_attr_origin is "node") of the constraint need to be covered by some solution walk). See subset constraints documentation

  • elements_to_ignore : list, optional

    List of edges (or nodes, if flow_attr_origin is "node") to ignore when adding constrains on flow explanation by the weighted walks. Default is an empty list. See ignoring edges documentation

  • additional_starts: list, optional

    List of additional start nodes of the walks. Default is an empty list. See additional start/end nodes documentation.

  • additional_ends: list, optional

    List of additional end nodes of the walks. Default is an empty list. See additional start/end nodes documentation.

  • optimization_options : dict, optional

    Dictionary with the optimization options. Default is an empty dict. See optimization options documentation.

  • solver_options : dict, optional

    Dictionary with the solver options. Default is {}. See solver options documentation.

Raises

ValueError

  • If weight_type is not int or float.
  • If some edge does not have the flow attribute specified as flow_attr.
  • If the graph does not satisfy flow conservation on nodes different from source or sink.
  • If the graph contains edges with negative (<0) flow values.
  • If flow_attr_origin is not “node” or “edge”.
Source code in flowpaths/minflowdecompcycles.py
def __init__(
    self,
    G: nx.DiGraph,
    flow_attr: str,
    flow_attr_origin: str = "edge",
    weight_type: type = int,
    subset_constraints: list = [],
    subset_constraints_coverage: float = 1.0,
    elements_to_ignore: list = [],
    additional_starts: list = [],
    additional_ends: list = [],
    optimization_options: dict = {},
    solver_options: dict = {},
):
    """
    Parameters
    ----------
    - `G : nx.DiGraph`

        The input directed graph, as [networkx DiGraph](https://networkx.org/documentation/stable/reference/classes/digraph.html), possibly with cycles.

    - `flow_attr : str`

        The attribute name from where to get the flow values on the edges.

    - `flow_attr_origin : str`, optional

        The origin of the flow attribute. Default is `"edge"`. Options:

        - `"edge"`: the flow attribute is assumed to be on the edges of the graph.
        - `"node"`: the flow attribute is assumed to be on the nodes of the graph. See [the documentation](node-expanded-digraph.md) on how node-weighted graphs are handled.

    - `weight_type : type`, optional

        The type of weights (`int` or `float`). Default is `int`.

    - `subset_constraints : list`, optional

        List of subset constraints. Default is an empty list. 
        Each subset constraint is a list of edges that must be covered by some solution walks, in any order, according 
        to the `subset_constraints_coverage` parameter (see below).

    - `subset_constraints_coverage: float`, optional

        Coverage fraction of the subset constraints that must be covered by some solution walk. 

        Defaults to `1.0`, meaning that 100% of the edges (or nodes, if `flow_attr_origin` is `"node"`) of
        the constraint need to be covered by some solution walk).
        See [subset constraints documentation](subset-constraints.md#3-relaxing-the-constraint-coverage)

    - `elements_to_ignore : list`, optional

        List of edges (or nodes, if `flow_attr_origin` is `"node"`) to ignore when adding constrains on flow explanation by the weighted walks. 
        Default is an empty list. See [ignoring edges documentation](ignoring-edges.md)

    - `additional_starts: list`, optional

        List of additional start nodes of the walks. Default is an empty list. See [additional start/end nodes documentation](additional-start-end-nodes.md).

    - `additional_ends: list`, optional

        List of additional end nodes of the walks. Default is an empty list. See [additional start/end nodes documentation](additional-start-end-nodes.md).

    - `optimization_options : dict`, optional

        Dictionary with the optimization options. Default is an empty dict. See [optimization options documentation](solver-options-optimizations.md).

    - `solver_options : dict`, optional

        Dictionary with the solver options. Default is `{}`. See [solver options documentation](solver-options-optimizations.md).

    Raises
    ------
    `ValueError`

    - If `weight_type` is not `int` or `float`.
    - If some edge does not have the flow attribute specified as `flow_attr`.
    - If the graph does not satisfy flow conservation on nodes different from source or sink.
    - If the graph contains edges with negative (<0) flow values.
    - If `flow_attr_origin` is not "node" or "edge".
    """

    # Handling node-weighted graphs
    self.flow_attr_origin = flow_attr_origin
    if self.flow_attr_origin == "node":
        if G.number_of_nodes() == 0:
            utils.logger.error(f"{__name__}: The input graph G has no nodes. Please provide a graph with at least one node.")
            raise ValueError(f"The input graph G has no nodes. Please provide a graph with at least one node.")
        if len(additional_starts) + len(additional_ends) == 0:
            self.G_internal = nedg.NodeExpandedDiGraph(
                G=G, 
                node_flow_attr=flow_attr
            )
        else:
            self.G_internal = nedg.NodeExpandedDiGraph(
                G=G, 
                node_flow_attr=flow_attr,
                additional_starts=additional_starts,
                additional_ends=additional_ends,
            )
        subset_constraints_internal = self.G_internal.get_expanded_subpath_constraints(subset_constraints)
        additional_starts_internal = self.G_internal.get_expanded_additional_starts(additional_starts)
        additional_ends_internal = self.G_internal.get_expanded_additional_ends(additional_ends)

        edges_to_ignore_internal = self.G_internal.edges_to_ignore
        if not all(isinstance(element_to_ignore, str) for element_to_ignore in elements_to_ignore):
            utils.logger.error(f"elements_to_ignore must be a list of nodes (i.e strings), not {elements_to_ignore}")
            raise ValueError(f"elements_to_ignore must be a list of nodes (i.e strings), not {elements_to_ignore}")
        edges_to_ignore_internal += [self.G_internal.get_expanded_edge(node) for node in elements_to_ignore]
        edges_to_ignore_internal = list(set(edges_to_ignore_internal))

    elif self.flow_attr_origin == "edge":
        if G.number_of_edges() == 0:
            utils.logger.error(f"{__name__}: The input graph G has no edges. Please provide a graph with at least one edge.")
            raise ValueError(f"The input graph G has no edges. Please provide a graph with at least one edge.")
        if len(additional_starts) + len(additional_ends) > 0:
            utils.logger.error(f"additional_starts and additional_ends are not supported when flow_attr_origin is 'edge'.")
            raise ValueError(f"additional_starts and additional_ends are not supported when flow_attr_origin is 'edge'.")
        self.G_internal = G
        subset_constraints_internal = subset_constraints
        if not all(isinstance(edge, tuple) and len(edge) == 2 for edge in elements_to_ignore):
            utils.logger.error(f"elements_to_ignore must be a list of edges (i.e. tuples of nodes), not {elements_to_ignore}")
            raise ValueError(f"elements_to_ignore must be a list of edges (i.e. tuples of nodes), not {elements_to_ignore}")
        edges_to_ignore_internal = elements_to_ignore
        additional_starts_internal = additional_starts
        additional_ends_internal = additional_ends

    else:
        utils.logger.error(f"flow_attr_origin must be either 'node' or 'edge', not {self.flow_attr_origin}")
        raise ValueError(f"flow_attr_origin must be either 'node' or 'edge', not {self.flow_attr_origin}")

    self.G = self.G_internal
    self.subset_constraints = subset_constraints_internal
    self.edges_to_ignore = edges_to_ignore_internal
    self.additional_starts = additional_starts_internal
    self.additional_ends = additional_ends_internal

    self.flow_attr = flow_attr
    self.weight_type = weight_type
    self.subset_constraints_coverage = subset_constraints_coverage
    self.optimization_options = optimization_options
    self.solver_options = solver_options
    self.time_limit = self.solver_options.get("time_limit", sw.SolverWrapper.time_limit)
    self.solve_time_start = None

    self.solve_statistics = {}
    self._solution = None
    self._lowerbound_k = None
    self._is_solved = None

    # Get the max flow value on an edge
    self.w_max = max(self.G.edges[edge][self.flow_attr] 
                      for edge in self.G.edges 
                      if self.flow_attr in self.G.edges[edge]
                      and edge not in self.edges_to_ignore) if self.G.number_of_edges() > 0 else 0

    # Internal variables
    self._generating_set = None
    self._given_weights_model: kflowdecompcycles.kFlowDecompCycles | None = None
    self._mingenset_model: mgs.MinGenSet | None = None
    self._source_flow = None

    utils.logger.info(f"{__name__}: initialized with graph id = {utils.fpid(G)}")

get_solution

get_solution()

Retrieves the solution for the flow decomposition problem.

Returns
  • solution: dict

    A dictionary containing the solution walks (key "walks") and their corresponding weights (key "weights").

Raises
  • exception If model is not solved.
Source code in flowpaths/minflowdecompcycles.py
def get_solution(self):
    """
    Retrieves the solution for the flow decomposition problem.

    Returns
    -------
    - `solution: dict`

        A dictionary containing the solution walks (key `"walks"`) and their corresponding weights (key `"weights"`).

    Raises
    -------
    - `exception` If model is not solved.
    """
    self.check_is_solved()
    return self._solution

solve

solve() -> bool

Attempts to solve the flow decomposition problem using a model with varying number of walks.

This method iterates over a range of possible walk numbers, creating and solving a flow decomposition model for each count. If a solution is found, it stores the solution and relevant statistics, and returns True. If no solution is found after iterating through all possible walk numbers, it returns False.

Returns:

Name Type Description
bool bool

True if a solution is found, False otherwise.

Note

This overloads the solve() method from AbstractWalkModelDiGraph class.

Source code in flowpaths/minflowdecompcycles.py
def solve(self) -> bool:
    """
    Attempts to solve the flow decomposition problem using a model with varying number of walks.

    This method iterates over a range of possible walk numbers, creating and solving a flow decomposition model for each count.
    If a solution is found, it stores the solution and relevant statistics, and returns True. If no solution is found after
    iterating through all possible walk numbers, it returns False.

    Returns:
        bool: True if a solution is found, False otherwise.

    Note:
        This overloads the `solve()` method from `AbstractWalkModelDiGraph` class.
    """
    self.solve_time_start = time.perf_counter()

    if self.optimization_options.get("optimize_with_given_weights", MinFlowDecompCycles.optimize_with_given_weights):            
        self._solve_with_given_weights()

    for i in range(self.get_lowerbound_k(), self.G.number_of_edges()):
        utils.logger.info(f"{__name__}: solving with k = {i}")
        fd_model = None
        # Checking if we have already found a solution with the same number of walks
        # via the min gen set and given weights approach
        if self._given_weights_model is not None and self._given_weights_model.is_solved():
            if len(self._given_weights_model.get_solution(remove_empty_walks=True)["walks"]) == i:
                fd_model = self._given_weights_model

        fd_solver_options = copy.deepcopy(self.solver_options)
        fd_solver_options["time_limit"] = self.time_limit - self.solve_time_elapsed

        if fd_model is None:
            fd_model = kflowdecompcycles.kFlowDecompCycles(
                G=self.G,
                flow_attr=self.flow_attr,
                k=i,
                weight_type=self.weight_type,
                subset_constraints=self.subset_constraints,
                subset_constraints_coverage=self.subset_constraints_coverage,
                elements_to_ignore=self.edges_to_ignore,
                additional_starts=self.additional_starts,
                additional_ends=self.additional_ends,
                optimization_options=self.optimization_options,
                solver_options=fd_solver_options,
            )
            fd_model.solve()

            # If the previous run exceeded the time limit, 
            # we still stop the search, even if we might have managed to solve it
            if self.solve_time_elapsed > self.time_limit:
                return False

        if fd_model.is_solved():
            self._solution = fd_model.get_solution(remove_empty_walks=True)
            if self.flow_attr_origin == "node":
                # If the flow_attr_origin is "node", we need to convert the solution walks from the expanded graph to walks in the original graph.
                self._solution["_walks_internal"] = self._solution["walks"]
                self._solution["walks"] = self.G_internal.get_condensed_paths(self._solution["walks"])
            self.set_solved()
            self.solve_statistics = fd_model.solve_statistics
            # we overwrite solve_time with the total time MFD has taken
            self.solve_statistics["solve_time"] = time.perf_counter() - self.solve_time_start
            self.solve_statistics["min_gen_set_solve_time"] = self._mingenset_model.solve_statistics.get("total_solve_time", 0) if self._mingenset_model is not None else 0
            self.fd_model = fd_model
            return True
        elif fd_model.solver.get_model_status() == sw.SolverWrapper.infeasible_status:
            utils.logger.info(f"{__name__}: model is infeasible for k = {i}")
        else:
            # If the model is not solved and the status is not infeasible,
            # it means that the solver stopped because of an unexpected termination,
            # thus we cannot conclude that the model is infeasible.
            # In this case, we stop the search.
            return False

    return False