Skip to content

k-Flow Decomposition

This class implements a solver for the problem of decomposing a flow into a given number \(k\) of paths (\(k\)-flow decomposition). This problem is a generalization of Minimum Flow Decomposition, in the sense that we are also given the number of paths that we need to decompose the flow in.

The class MinFlowDecomp uses this class internally to find the minimum value of \(k\) for which a \(k\)-flow decomposition exists.

Warning

Suppose that the number of paths of a minimum flow decomposition is \(k^*\). If we ask for a decomposition with \(k > k^*\) paths, this class will always return a decomposition with \(k\) paths, but some paths might have weight 0.

kFlowDecomp

kFlowDecomp(
    G: DiGraph,
    flow_attr: str,
    k: int,
    flow_attr_origin: str = "edge",
    weight_type: type = float,
    subpath_constraints: list = [],
    subpath_constraints_coverage: float = 1.0,
    subpath_constraints_coverage_length: float = None,
    length_attr: str = None,
    elements_to_ignore: list = [],
    optimization_options: dict = {},
    solver_options: dict = {},
)

Bases: AbstractPathModelDAG

Class to decompose a flow into a given number of weighted paths.

Initialize the Flow Decomposition model for a given number of paths k.

Parameters

  • G : nx.DiGraph

    The input directed acyclic graph, as networkx DiGraph.

  • flow_attr : str

    The attribute name from where to get the flow values on the edges.

  • k: int

    The number of paths to decompose in.

  • flow_attr_origin : str, optional

    The origin of the flow attribute. Default is "edge". Options:

    • "edge": the flow attribute is assumed to be on the edges of the graph.
    • "node": the flow attribute is assumed to be on the nodes of the graph. See the documentation on how node-weighted graphs are handled.
  • weight_type : type, optional

    The type of weights (int or float). Default is float.

  • subpath_constraints : list, optional

    List of subpath constraints. Default is an empty list. Each subpath constraint is a list of edges that must be covered by some solution path, according to the subpath_constraints_coverage or subpath_constraints_coverage_length parameters (see below).

  • subpath_constraints_coverage : float, optional

    Coverage fraction of the subpath constraints that must be covered by some solution paths.

    Defaults to 1.0, meaning that 100% of the edges (or nodes, if flow_attr_origin is "node") of the constraint need to be covered by some solution path). See subpath constraints documentation

  • subpath_constraints_coverage_length : float, optional

    Coverage length of the subpath constraints. Default is None. If set, this overrides subpath_constraints_coverage, and the coverage constraint is expressed in terms of the subpath constraint length. subpath_constraints_coverage_length is then the fraction of the total length of the constraint (specified via length_attr) needs to appear in some solution path. See subpath constraints documentation

  • length_attr : str, optional

    The attribute name from where to get the edge lengths (or node length, if flow_attr_origin is "node"). Defaults to None.

    • If set, then the subpath lengths (above) are in terms of the edge/node lengths specified in the length_attr field of each edge/node.
    • If set, and an edge/node has a missing edge length, then it gets length 1.
  • elements_to_ignore : list, optional

    List of edges (or nodes, if flow_attr_origin is "node") to ignore when adding constrains on flow explanation by the weighted paths. Default is an empty list. See ignoring edges documentation

  • optimization_options : dict, optional

    Dictionary with the optimization options. Default is None. See optimization options documentation. This class also supports the optimization "optimize_with_greedy": True (this is the default value). This will use a greedy algorithm to solve the problem, and if the number of paths returned by it equals a lowerbound on the solution size, then we know the greedy solution is optimum, and it will use that. The lowerbound used currently is the edge-width of the graph, meaning the minimum number of paths needed to cover all edges. This is a correct lowerbound because any flow decomposition must cover all edges, as they have non-zero flow.

  • solver_options : dict, optional

    Dictionary with the solver options. Default is None. See solver options documentation.

Raises

  • ValueError: If weight_type is not int or float.
  • ValueError: If some edge does not have the flow attribute specified as flow_attr.
  • ValueError: If the graph does not satisfy flow conservation on nodes different from source or sink.
  • ValueError: If the graph contains edges with negative (<0) flow values.
  • ValueError: If flow_attr_origin is not “node” or “edge”.
Source code in flowpaths/kflowdecomp.py
def __init__(
    self,
    G: nx.DiGraph,
    flow_attr: str,
    k: int,
    flow_attr_origin: str = "edge",
    weight_type: type = float,
    subpath_constraints: list = [],
    subpath_constraints_coverage: float = 1.0,
    subpath_constraints_coverage_length: float = None,
    length_attr: str = None,
    elements_to_ignore: list = [],
    optimization_options: dict = {},
    solver_options: dict = {},
):
    """
    Initialize the Flow Decomposition model for a given number of paths `k`.

    Parameters
    ----------
    - `G : nx.DiGraph`

        The input directed acyclic graph, as networkx DiGraph.

    - `flow_attr : str`

        The attribute name from where to get the flow values on the edges.

    - `k: int`

        The number of paths to decompose in.

    - `flow_attr_origin : str`, optional

        The origin of the flow attribute. Default is `"edge"`. Options:

        - `"edge"`: the flow attribute is assumed to be on the edges of the graph.
        - `"node"`: the flow attribute is assumed to be on the nodes of the graph. See [the documentation](node-expanded-digraph.md) on how node-weighted graphs are handled.

    - `weight_type : type`, optional

        The type of weights (`int` or `float`). Default is `float`.

    - `subpath_constraints : list`, optional

        List of subpath constraints. Default is an empty list. 
        Each subpath constraint is a list of edges that must be covered by some solution path, according 
        to the `subpath_constraints_coverage` or `subpath_constraints_coverage_length` parameters (see below).

    - `subpath_constraints_coverage : float`, optional

        Coverage fraction of the subpath constraints that must be covered by some solution paths. 

        Defaults to `1.0`, meaning that 100% of the edges (or nodes, if `flow_attr_origin` is `"node"`) of 
        the constraint need to be covered by some solution path). 
        See [subpath constraints documentation](subpath-constraints.md#3-relaxing-the-constraint-coverage)

    - `subpath_constraints_coverage_length : float`, optional

        Coverage length of the subpath constraints. Default is `None`. If set, this overrides `subpath_constraints_coverage`, 
        and the coverage constraint is expressed in terms of the subpath constraint length. 
        `subpath_constraints_coverage_length` is then the fraction of the total length of the constraint (specified via `length_attr`) needs to appear in some solution path.
        See [subpath constraints documentation](subpath-constraints.md#3-relaxing-the-constraint-coverage)

    - `length_attr : str`, optional

        The attribute name from where to get the edge lengths (or node length, if `flow_attr_origin` is `"node"`). Defaults to `None`.

        - If set, then the subpath lengths (above) are in terms of the edge/node lengths specified in the `length_attr` field of each edge/node.
        - If set, and an edge/node has a missing edge length, then it gets length 1.

    - `elements_to_ignore : list`, optional

        List of edges (or nodes, if `flow_attr_origin` is `"node"`) to ignore when adding constrains on flow explanation by the weighted paths. Default is an empty list. See [ignoring edges documentation](ignoring-edges.md)

    - `optimization_options : dict`, optional

        Dictionary with the optimization options. Default is `None`. See [optimization options documentation](solver-options-optimizations.md).
        This class also supports the optimization `"optimize_with_greedy": True` (this is the default value). This
        will use a greedy algorithm to solve the problem, and if the number of paths returned by it equals a lowerbound on the solution size,
        then we know the greedy solution is optimum, and it will use that. The lowerbound used currently is the edge-width of the graph,
        meaning the minimum number of paths needed to cover all edges. This is a correct lowerbound because any flow decomposition must cover all edges, 
        as they have non-zero flow.

    - `solver_options : dict`, optional

        Dictionary with the solver options. Default is `None`. See [solver options documentation](solver-options-optimizations.md).


    Raises
    ----------
    - ValueError: If `weight_type` is not int or float.
    - ValueError: If some edge does not have the flow attribute specified as `flow_attr`.
    - ValueError: If the graph does not satisfy flow conservation on nodes different from source or sink.
    - ValueError: If the graph contains edges with negative (<0) flow values.
    - ValueError: If `flow_attr_origin` is not "node" or "edge".
    """

    # Handling node-weighted graphs
    self.flow_attr_origin = flow_attr_origin
    if self.flow_attr_origin == "node":
        self.G_internal = nedg.NodeExpandedDiGraph(G, node_flow_attr=flow_attr, node_length_attr=length_attr)
        subpath_constraints_internal = self.G_internal.get_expanded_subpath_constraints(subpath_constraints)

        edges_to_ignore_internal = self.G_internal.edges_to_ignore
        if not all(isinstance(element_to_ignore, str) for element_to_ignore in elements_to_ignore):
            utils.logger.error(f"elements_to_ignore must be a list of nodes (i.e strings), not {elements_to_ignore}")
            raise ValueError(f"elements_to_ignore must be a list of nodes (i.e strings), not {elements_to_ignore}")
        edges_to_ignore_internal += [self.G_internal.get_expanded_edge(node) for node in elements_to_ignore]
        edges_to_ignore_internal = list(set(edges_to_ignore_internal))

    elif self.flow_attr_origin == "edge":
        self.G_internal = G
        subpath_constraints_internal = subpath_constraints
        if not all(isinstance(edge, tuple) and len(edge) == 2 for edge in elements_to_ignore):
            utils.logger.error(f"elements_to_ignore must be a list of edges (i.e. tuples of nodes), not {elements_to_ignore}")
            raise ValueError(f"elements_to_ignore must be a list of edges (i.e. tuples of nodes), not {elements_to_ignore}")
        edges_to_ignore_internal = elements_to_ignore
    else:
        utils.logger.error(f"flow_attr_origin must be either 'node' or 'edge', not {self.flow_attr_origin}")
        raise ValueError(f"flow_attr_origin must be either 'node' or 'edge', not {self.flow_attr_origin}")

    self.G = stdigraph.stDiGraph(self.G_internal)
    self.subpath_constraints = subpath_constraints_internal
    self.edges_to_ignore = self.G.source_sink_edges.union(edges_to_ignore_internal)

    if weight_type not in [int, float]:
        utils.logger.error(f"weight_type must be either int or float, not {weight_type}")
        raise ValueError(f"weight_type must be either int or float, not {weight_type}")
    self.weight_type = weight_type

    # Check requirements on input graph:
    # Check flow conservation only if there are no edges to ignore
    satisfies_flow_conservation = gu.check_flow_conservation(G, flow_attr)
    if len(edges_to_ignore_internal) == 0 and not satisfies_flow_conservation:
        utils.logger.error(f"{__name__}: The graph G does not satisfy flow conservation or some edges have missing `flow_attr`. This is an error, unless you passed `edges_to_ignore` to include at least those edges with missing `flow_attr`.")
        raise ValueError("The graph G does not satisfy flow conservation or some edges have missing `flow_attr`. This is an error, unless you passed `edges_to_ignore` to include at least those edges with missing `flow_attr`.")

    # Check that the flow is positive and get max flow value
    self.flow_attr = flow_attr
    self.w_max = self.weight_type(
        self.G.get_max_flow_value_and_check_non_negative_flow(
            flow_attr=self.flow_attr, edges_to_ignore=self.edges_to_ignore
        )
    )

    self.k = k

    self.subpath_constraints_coverage = subpath_constraints_coverage
    self.subpath_constraints_coverage_length = subpath_constraints_coverage_length
    self.length_attr = length_attr

    self.pi_vars = {}
    self.path_weights_vars = {}

    self.path_weights_sol = None
    self._solution = None
    self._lowerbound_k = None

    self.solve_statistics = {}
    self.optimization_options = optimization_options.copy() or {}

    greedy_solution_paths = None
    self.optimize_with_greedy = self.optimization_options.get("optimize_with_greedy", kFlowDecomp.optimize_with_greedy)
    self.optimize_with_flow_safe_paths = self.optimization_options.get("optimize_with_flow_safe_paths", kFlowDecomp.optimize_with_flow_safe_paths)

    # We can apply the greedy algorithm only if 
    # - there are no edges to ignore (in the original input graph), and 
    # - the graph satisfies flow conservation
    if self.optimize_with_greedy and len(edges_to_ignore_internal) == 0 and satisfies_flow_conservation:
        if self._get_solution_with_greedy():
            greedy_solution_paths = self._solution["paths"]
            self.optimization_options["external_solution_paths"] = greedy_solution_paths

    if self.optimize_with_flow_safe_paths and satisfies_flow_conservation:
        start_time = time.perf_counter()
        self.optimization_options["external_safe_paths"] = sfd.compute_flow_decomp_safe_paths(G=G, flow_attr=self.flow_attr)
        self.solve_statistics["flow_safe_paths_time"] = time.perf_counter() - start_time
        # If we optimize with flow safe paths, we need to disable optimizing with safe paths and sequences
        if self.optimization_options.get("optimize_with_safe_paths", False):
            utils.logger.error(f"{__name__}: Cannot optimize with both flow safe paths and safe paths")
            raise ValueError("Cannot optimize with both flow safe paths and safe paths")
        if self.optimization_options.get("optimize_with_safe_sequences", False):
            utils.logger.error(f"{__name__}: Cannot optimize with both flow safe paths and safe sequences")
            raise ValueError("Cannot optimize with both flow safe paths and safe sequences")

    self.optimization_options["trusted_edges_for_safety"] = self.G.get_non_zero_flow_edges(flow_attr=self.flow_attr, edges_to_ignore=self.edges_to_ignore)

    # Call the constructor of the parent class AbstractPathModelDAG
    super().__init__(
        G=self.G, 
        k=self.k,
        subpath_constraints=self.subpath_constraints, 
        subpath_constraints_coverage=self.subpath_constraints_coverage, 
        subpath_constraints_coverage_length=self.subpath_constraints_coverage_length,
        length_attr=self.length_attr, 
        optimization_options=self.optimization_options,
        solver_options=solver_options,
        solve_statistics=self.solve_statistics,
    )

    # If already solved with a previous method, we don't create solver, not add paths
    if self.is_solved():
        return

    # This method is called from the super class AbstractPathModelDAG
    self.create_solver_and_paths()

    # This method is called from the current class to encode the flow decomposition
    self._encode_flow_decomposition()

    # The given weights optimization
    self._encode_given_weights()

    utils.logger.info(f"{__name__}: initialized with graph id = {utils.fpid(G)}, k = {self.k}")

get_solution

get_solution(
    remove_empty_paths=False,
)

Retrieves the solution for the flow decomposition problem.

If the solution has already been computed and cached as self.solution, it returns the cached solution. Otherwise, it checks if the problem has been solved, computes the solution paths and weights, and caches the solution.

Parameters
  • remove_empty_paths: bool, optional

    If True, removes empty paths from the solution. Default is False. These can happen only if passed the optimization option "allow_empty_paths" : True.

Returns
  • solution: dict

    A dictionary containing the solution paths (key "paths") and their corresponding weights (key "weights").

Raises
  • exception If model is not solved.
Source code in flowpaths/kflowdecomp.py
def get_solution(self, remove_empty_paths=False):
    """
    Retrieves the solution for the flow decomposition problem.

    If the solution has already been computed and cached as `self.solution`, it returns the cached solution.
    Otherwise, it checks if the problem has been solved, computes the solution paths and weights,
    and caches the solution.

    Parameters
    ----------

    - `remove_empty_paths: bool`, optional

        If `True`, removes empty paths from the solution. Default is `False`. These can happen only if passed the optimization option `"allow_empty_paths" : True`.

    Returns
    -------
    - `solution: dict`

        A dictionary containing the solution paths (key `"paths"`) and their corresponding weights (key `"weights"`).

    Raises
    ------
    - `exception` If model is not solved.
    """

    if self._solution is not None:            
        return self._remove_empty_paths(self._solution) if remove_empty_paths else self._solution

    self.check_is_solved()
    weights_sol_dict = self.solver.get_variable_values("w", [int])
    self.path_weights_sol = [
        (
            round(weights_sol_dict[i])
            if self.weight_type == int
            else float(weights_sol_dict[i])
        )
        for i in range(self.k)
    ]

    if self.flow_attr_origin == "edge":
        self._solution = {
            "paths": self.get_solution_paths(),
            "weights": self.path_weights_sol,
        }
    elif self.flow_attr_origin == "node":
        self._solution = {
            "_paths_internal": self.get_solution_paths(),
            "paths": self.G_internal.get_condensed_paths(self.get_solution_paths()),
            "weights": self.path_weights_sol,
        }

    return self._remove_empty_paths(self._solution) if remove_empty_paths else self._solution

is_valid_solution

is_valid_solution(
    tolerance=0.001,
)

Checks if the solution is valid by comparing the flow from paths with the flow attribute in the graph edges.

Raises
  • ValueError: If the solution is not available (i.e., self.solution is None).
Returns
  • bool: True if the solution is valid, False otherwise.
Notes
  • get_solution() must be called before this method.
  • The solution is considered valid if the flow from paths is equal (up to TOLERANCE * num_paths_on_edges[(u, v)]) to the flow value of the graph edges.
Source code in flowpaths/kflowdecomp.py
def is_valid_solution(self, tolerance=0.001):
    """
    Checks if the solution is valid by comparing the flow from paths with the flow attribute in the graph edges.

    Raises
    ------
    - ValueError: If the solution is not available (i.e., self.solution is None).

    Returns
    -------
    - bool: True if the solution is valid, False otherwise.

    Notes
    -------
    - get_solution() must be called before this method.
    - The solution is considered valid if the flow from paths is equal
        (up to `TOLERANCE * num_paths_on_edges[(u, v)]`) to the flow value of the graph edges.
    """

    if self._solution is None:
        utils.logger.error(f"{__name__}: Solution is not available. Call get_solution() first.")
        raise ValueError("Solution is not available. Call get_solution() first.")

    solution_paths = self._solution.get("_paths_internal", self._solution["paths"])
    solution_weights = self._solution["weights"]
    solution_paths_of_edges = [
        [(path[i], path[i + 1]) for i in range(len(path) - 1)]
        for path in solution_paths
    ]

    flow_from_paths = {(u, v): 0 for (u, v) in self.G.edges()}
    num_paths_on_edges = {e: 0 for e in self.G.edges()}
    for weight, path in zip(solution_weights, solution_paths_of_edges):
        for e in path:
            flow_from_paths[e] += weight
            num_paths_on_edges[e] += 1

    for u, v, data in self.G.edges(data=True):
        if self.flow_attr in data and (u,v) not in self.edges_to_ignore:
            if (
                abs(flow_from_paths[(u, v)] - data[self.flow_attr])
                > tolerance * num_paths_on_edges[(u, v)]
            ):
                utils.logger.debug(f"Flow validation failed for edge ({u}, v): expected {data[self.flow_attr]}, got {flow_from_paths[(u, v)]}")
                return False

    return True