Skip to content

Minimum Correction of Weights to a Flow

Often, the edge weights of a graph are not a flow (i.e. do not satisfy flow conservation for non- source/sink nodes). While the models k-Minimum Path Error or k-Least Absolute Errors can decompose such graphs, as a less principled approach, one can first minimally correct the graph weights to become a flow, and then optimally decompose the resulting flow using the Minimum Flow Decomposition model.

This is faster in practice, because the Minimum Flow Decomposition solver is faster than the ones decomposing graphs without flow conservation. We are thus delegating error correction to a pre-processing step, and then avoiding the error-handling difficulty when decomposing the resulting graph.

1. Definition

This class solves the following problem.

  • INPUT: A directed graph \(G = (V,E)\) and a weight \(w(u,v)\) for every edge \((u,v)\) of \(G\). Weights do not necessarily satisfy flow conservation.

  • OUTPUT: A flow value \(f(u,v)\) for every edge \((u,v) \in E\), satisfying the flow conservation property for all non-source and non-sink nodes \(v\): $$ \sum_{(u,v) \in E} f(u,v) = \sum_{(v,u) \in E} f(v,u), $$ minimizing the sum of absolute errors: $$ \sum_{(u,v) \in E}\Big|f(u,v) - w(u,v)\Big|. $$

2. Examples

Let’s consider the following weighted graph, whose weights do not satisfy flow conservation, because the edges shown in brown are erroneous.

flowchart LR
    s((s))
    a((a))
    b((b))
    c((c))
    d((d))
    t((t))
    s -->|7| a
    a -->|2| b
    s -->|7| b
    a -->|4| c
    b -->|9| c
    c -->|7| d
    d -->|6| t
    c -->|7| t

    linkStyle 0,5 stroke:brown,stroke-width:3;

Let’s then minimally correct its weights with default settings.

import flowpaths as fp
import networkx as nx

graph = nx.DiGraph()
graph.add_edge("s", "a", flow=7)
graph.add_edge("s", "b", flow=7)
graph.add_edge("a", "b", flow=2)
graph.add_edge("a", "c", flow=4)
graph.add_edge("b", "c", flow=9)
graph.add_edge("c", "d", flow=7)
graph.add_edge("c", "t", flow=7)
graph.add_edge("d", "t", flow=6)

# We create a the Minimum Error Flow solver with default settings
correction_model = fp.MinErrorFlow(graph, flow_attr="flow")
correction_model.solve()

The resulting graph is below. The edges whose weights have been corrected are in green.

flowchart LR
    s((s))
    a((a))
    b((b))
    c((c))
    d((d))
    t((t))
    s -->|6| a
    a -->|2| b
    s -->|7| b
    a -->|4| c
    b -->|9| c
    c -->|6| d
    d -->|6| t
    c -->|7| t

    linkStyle 0,5 stroke:green,stroke-width:3;

Once we have corrected the weights, we can get the corrected graph and e.g. apply the minimum flow decomposition model on it.

if correction_model.is_solved():
    corrected_graph = correction_model.get_corrected_graph()
    # This is equivalent to: corrected_graph = correction_model.get_solution()["graph"]
    mfd_model = fp.MinFlowDecomp(corrected_graph, flow_attr="flow")
    mfd_model.solve()
    if mfd_model.is_solved():
        print(mfd_model.get_solution())
        # {'paths': [
        #   ['s', 'b', 'c', 't'], 
        #   ['s', 'a', 'c', 'd', 't'], 
        #   ['s', 'a', 'b', 'c', 'd', 't']], 
        # 'weights': [7.0, 4.0, 2.0]}

This gives the following paths.

flowchart LR
    s((s))
    a((a))
    b((b))
    c((c))
    d((d))
    t((t))
    s -->|4| a
    a -->|4| c
    c -->|4| d
    d -->|4| t
    linkStyle 0,1,2,3 stroke:red,stroke-width:3;
    s -->|2| a
    a -->|2| b
    b -->|2| c
    c -->|2| d
    d -->|2| t
    linkStyle 4,5,6,7,8 stroke:orange,stroke-width:3;
    s -->|7| b
    b -->|7| c
    c -->|7| t
    linkStyle 9,10,11 stroke:blue,stroke-width:3;

3. Generalizations

This class implements a more general version, as follows:

  1. For acyclic graphs, the corrected flow can start/end not only in source/sink nodes, but also in given sets of start/end nodes (set parameters additional_starts and additional_ends). See also Additional start/end nodes.
  2. The error can count only for a given subset \(E' \subseteq E\) of the edges (set parameter edges_to_ignore to be \(E \setminus E'\)). See also ignoring edges documentation.
  3. The error (i.e. the above absolute of the difference) of every edge can contribute differently to the objective function, according to a scale factor \(\in [0,1]\). Set these via a dictionary that you pass to edge_error_scaling, which stores the scale factor \(\lambda_{(u,v)} \in [0,1]\) of each edge \((u,v)\) in the dictionary. Setting \(\lambda_{(u,v)} = 0\) will add the edge \((u,v)\) to edges_to_ignore, because the constraint for \((u,v)\) becomes always true. See also ignoring edges documentation.
  4. For acyclic graphs, one can also ensure some “sparsity” in the solution, meaning the total corrected flow exiting the source node is counts also in the minimization function, with a given multiplier \(\lambda\) (see ref. [2]). If \(\lambda = 0\), this has no effect.

Generalized objective function

Formally, the objective function generalized as in 2., 3. and 4. above is: $$ \sum_{(u,v) \in E’}\lambda_{(u,v)} \cdot \Big|f(u,v) - w(u,v)\Big| + \lambda \sum_{(s,v) \in E} f(s,v). $$

4. References

  1. Alexandru I. Tomescu, Anna Kuosmanen, Romeo Rizzi, Veli Mäkinen A novel min-cost flow method for estimating transcript expression with RNA-Seq BMC Bioinformatics 14(S-5), S15, 2013 (preprint)
  2. Elsa Bernard, Laurent Jacob, Julien Mairal, Jean-Philippe Vert Efficient RNA isoform identification and quantification from RNA-Seq data with network flows, Bioinformatics, Volume 30, Issue 17, September 2014, Pages 2447–2455

MinErrorFlow

MinErrorFlow(
    G: DiGraph,
    flow_attr: str,
    weight_type: type = float,
    sparsity_lambda: float = 0,
    few_flow_values_epsilon: float = None,
    edges_to_ignore: list = [],
    edge_error_scaling: dict = {},
    additional_starts: list = [],
    additional_ends: list = [],
    solver_options: dict = {},
)

This class implements a method to optimally correct the weights of a directed acyclic graph, so that:

  • The resulting weights become a flow, i.e. they become a non-negative flow, namely they satisfy the flow conservation constraints.
  • The resulting weights are as close as possible to the original weights, i.e. the sum of the absolute difference between an edge weight and the corrected flow value of the edge, for all edges, is minimized.

Parameters

  • G: networkx.DiGraph

    The directed graph to be corrected (which does not need to be acyclic).

  • flow_attr: str

    The name of the attribute in the edges of the graph that contains the weight of the edge.

  • weight_type: type, optional

    The type of the weights of the edges. It can be either int or float. Default is float.

  • sparsity_lambda: float, optional

    The sparsity parameter, used to control the trade-off between the sparsity of the solution and the closeness to the original weights. Default is 0. If sparsity_lambda is set to 0, then the solution will be as close as possible to the original weights. If sparsity_lambda is set to a positive value, then the solution will be sparser, i.e. it will have less flow going out of the source. The higher the value of sparsity_lambda, the sparser the solution will be. You can use a value different than 0 only for acyclic graphs. If you set it to a value different than 0 for a cyclic graph, the class will raise an error.**

  • few_flow_values_epsilon: float, optional

    The epsilon value (at least zero) used to control the number of distinct values in the corrected flow. If few_flow_values_epsilon is set to None, then the solution will be as close as possible to the original weights, and there is no bound on the number of distinct values in the corrected flow. If few_flow_values_epsilon is set to a positive value, then the solution will have fewer distinct flow values in the corrected flow, while ensuring that the objective value of the resulting problem is within \((1+ arepsilon)\) of the optimal solution (with this parameter set to None). The higher the value of few_flow_values_epsilon, the smaller the number of flow values in the corrected flow, but possibly higher the sum of edge errors in the corrected flow. Default is None.

    Warning

    Setting this can be slow on larger graphs.

  • edges_to_ignore: list, optional

    A list of edges to ignore. The weights of these edges will still be corrected, but their error will not count in the objective function that is being minimized. Default is []. See ignoring edges documentation

  • edge_error_scaling: dict, optional

    Dictionary edge: factor storing the error scale factor (in [0,1]) of every edge, which scale the allowed difference between edge weight and path weights. Default is an empty dict. If an edge has a missing error scale factor, it is assumed to be 1. The factors are used to scale the difference between the flow value of the edge and the sum of the weights of the paths going through the edge. See ignoring edges documentation

  • additional_starts: list, optional

    A list of nodes to be added as additional sources. This applies only to acyclic graphs. Flow is allowed to start start at these nodes, meaning that their out going flow can be greater than their incoming flow. Default is []. See also additional start/end nodes documentation.

  • additional_ends: list, optional

    A list of nodes to be added as additional sinks. This applies only to acyclic graphs. Flow is allowed to end at these nodes, meaning that their incoming flow can be greater than their outgoing flow. Default is []. See also additional start/end nodes documentation.

  • solver_options: dict, optional

    A dictionary containing the options for the solver. The options are passed to the solver wrapper. Default is {}. See solver options documentation.

Source code in flowpaths/minerrorflow.py
def __init__(
        self, 
        G: nx.DiGraph,
        flow_attr: str,
        weight_type: type = float,
        sparsity_lambda: float = 0,
        few_flow_values_epsilon: float = None,
        edges_to_ignore: list = [],
        edge_error_scaling: dict = {},
        additional_starts: list = [],
        additional_ends: list = [],
        solver_options: dict = {},
        ):
    """
    This class implements a method to optimally correct the weights of a directed acyclic graph, so that:

    - The resulting weights become a flow, i.e. they become a non-negative flow, namely they satisfy the flow conservation constraints.
    - The resulting weights are as close as possible to the original weights, i.e. the sum of the absolute difference between an edge weight and the corrected flow value of the edge, for all edges, is minimized.

    Parameters
    ----------

    - `G: networkx.DiGraph`

        The directed graph to be corrected (which does not need to be acyclic).

    - `flow_attr: str`

        The name of the attribute in the edges of the graph that contains the weight of the edge.

    - `weight_type: type`, optional

        The type of the weights of the edges. It can be either `int` or `float`. Default is `float`.

    - `sparsity_lambda: float`, optional

        The sparsity parameter, used to control the trade-off between the sparsity of the solution and the closeness to the original weights. Default is `0`.
        If `sparsity_lambda` is set to `0`, then the solution will be as close as possible to the original weights. If `sparsity_lambda` is set to a positive value, then the solution will be sparser, i.e. it will have less flow going out of the source.
        The higher the value of `sparsity_lambda`, the sparser the solution will be. **You can use a value different than `0` only for acyclic graphs.** If you set it to a value different than `0` for a cyclic graph, the class will raise an error.**

    - `few_flow_values_epsilon: float`, optional

        The epsilon value (at least zero) used to control the number of distinct values in the corrected flow. If `few_flow_values_epsilon` is set to `None`, then the solution will be as close as possible to the original weights,
        and there is no bound on the number of distinct values in the corrected flow.
        If `few_flow_values_epsilon` is set to a positive value, then the solution will have fewer distinct flow values in the corrected flow, while ensuring that the objective value of the resulting problem is within $(1+\varepsilon)$ of the optimal solution (with this parameter set to `None`).
        The higher the value of `few_flow_values_epsilon`, the smaller the number of flow values in the corrected flow, but possibly higher the sum of edge errors in the corrected flow. Default is `None`.

        !!! warning "Warning"

            Setting this can be slow on larger graphs.

    - `edges_to_ignore: list`, optional

        A list of edges to ignore. The weights of these edges will still be corrected, but their error will not count in the objective function that is being minimized. Default is `[]`. See [ignoring edges documentation](ignoring-edges.md)

    - `edge_error_scaling: dict`, optional

        Dictionary `edge: factor` storing the error scale factor (in [0,1]) of every edge, which scale the allowed difference between edge weight and path weights.
        Default is an empty dict. If an edge has a missing error scale factor, it is assumed to be 1. The factors are used to scale the 
        difference between the flow value of the edge and the sum of the weights of the paths going through the edge. See [ignoring edges documentation](ignoring-edges.md)

    - `additional_starts: list`, optional

        A list of nodes to be added as additional sources. **This applies only to acyclic graphs.** Flow is allowed to start start at these nodes, meaning that their out going flow can be greater than their incoming flow. Default is `[]`. See also [additional start/end nodes documentation](additional-start-end-nodes.md).

    - `additional_ends: list`, optional

        A list of nodes to be added as additional sinks. **This applies only to acyclic graphs.** Flow is allowed to end at these nodes, meaning that their incoming flow can be greater than their outgoing flow. Default is `[]`. See also [additional start/end nodes documentation](additional-start-end-nodes.md).

    - `solver_options: dict`, optional

        A dictionary containing the options for the solver. The options are passed to the solver wrapper. Default is `{}`. See [solver options documentation](solver-options-optimizations.md).
    """

    self.original_graph_copy = deepcopy(G)
    self.sparsity_lambda = sparsity_lambda

    if nx.is_directed_acyclic_graph(G):
        self.is_acyclic = True
        self.G = stdigraph.stDiGraph(G, additional_starts=additional_starts, additional_ends=additional_ends)
        self.edges_to_ignore = set(edges_to_ignore).union(self.G.source_sink_edges)
    else:
        self.G = G
        self.is_acyclic = False
        self.edges_to_ignore = set(edges_to_ignore)
        if self.sparsity_lambda != 0:
            utils.logger.error(f"{__name__}: You cannot set sparsity_lambda != 0 for a graph with cycles.")
            raise ValueError(f"You cannot set sparsity_lambda != 0 for a graph with cycles.")

    self.flow_attr = flow_attr
    if weight_type not in [int, float]:
        utils.logger.error(f"{__name__}: weight_type must be either int or float, not {weight_type}")
        raise ValueError(f"weight_type must be either int or float, not {weight_type}")
    self.weight_type = weight_type
    self.solver_options = solver_options

    self.edge_error_scaling = edge_error_scaling
    # Checking that every entry in self.edge_error_scaling is between 0 and 1
    for key, value in self.edge_error_scaling.items():
        if value < 0 or value > 1:
            utils.logger.error(f"{__name__}: Edge error scaling factor for edge {key} must be between 0 and 1.")
            raise ValueError(f"Edge error scaling factor for edge {key} must be between 0 and 1.")
        if value == 0:
            self.edges_to_ignore.add(key)
    self.different_flow_values_epsilon = few_flow_values_epsilon
    if few_flow_values_epsilon is not None:
        if few_flow_values_epsilon < 0:
            utils.logger.error(f"{__name__}: different_flow_values_epsilon must be greater than or equal to 0, not {few_flow_values_epsilon}")
            raise ValueError(f"different_flow_values_epsilon must be greater than or equal to 0, not {few_flow_values_epsilon}")
        if few_flow_values_epsilon == 0:
            self.different_flow_values_epsilon = None        

    self.__solution = None
    self.__is_solved = None
    self.solve_statistics = dict()

    self.edge_vars = {}
    self.edge_error_vars = {}
    self.edge_sol = {}

    self.w_max = max(
        [
            self.G[u][v].get(self.flow_attr, 0)
            for (u, v) in self.G.edges() 
        ]
    )
    self.ub = self.w_max * self.G.number_of_edges()

    self.__create_solver()

    self.__encode_flow()

    self.__encode_min_sum_errors_objective()  

    utils.logger.info(f"{__name__}: initialized with graph id = {utils.fpid(G)}")  

get_corrected_graph

get_corrected_graph()

Returns the corrected graph, as a networkx DiGraph. This is a deep copy of the original graph, but having the corrected weights.

Warning

Call the solve method first.

Source code in flowpaths/minerrorflow.py
def get_corrected_graph(self):
    """
    Returns the corrected graph, as a networkx DiGraph. This is a deep copy of the original graph, but having the corrected weights.

    !!! warning "Warning"
        Call the `solve` method first.
    """
    solution = self.get_solution()
    return solution["graph"]

get_solution

get_solution()

Returns the solution to the problem, if the model was solved, as a dictionary containing the following keys:

  • graph: the corrected graph, as a networkx DiGraph.
  • error: the error of the solution, i.e. the sum of the absolute differences between the original weights and the corrected weights.
  • objective_value: the value of the objective function.

Warning

Call the solve method first.

Source code in flowpaths/minerrorflow.py
def get_solution(self):
    """
    Returns the solution to the problem, if the model was solved, as a dictionary containing the following keys:

    - `graph`: the corrected graph, as a networkx DiGraph.
    - `error`: the error of the solution, i.e. the sum of the absolute differences between the original weights and the corrected weights.
    - `objective_value`: the value of the objective function.

    !!! warning "Warning"
        Call the `solve` method first.
    """
    if self.__solution is not None:
        return self.__solution

    self.__check_is_solved()

    edge_sol_dict = self.solver.get_variable_values("edge_vars", [str, str])
    for edge in edge_sol_dict.keys():
        self.edge_sol[edge] = (
            round(edge_sol_dict[edge])
            if self.weight_type == int
            else float(edge_sol_dict[edge])
        )

    edge_error_sol_dict = self.solver.get_variable_values("edge_error_vars", [str, str])
    error = sum(edge_error_sol_dict.values())

    corrected_graph = deepcopy(self.original_graph_copy)
    for u, v in corrected_graph.edges():
        if self.flow_attr in corrected_graph[u][v]:
            corrected_graph[u][v][self.flow_attr] = self.edge_sol[(u, v)]

    self.__solution = {
        "graph": corrected_graph,
        "error": error,
        "objective_value": self.solver.get_objective_value(),
    }

    return self.__solution  

is_solved

is_solved()

Returns True if the model was solved, False otherwise.

Source code in flowpaths/minerrorflow.py
def is_solved(self):
    """
    Returns `True` if the model was solved, `False` otherwise.
    """
    return self.__is_solved

solve

solve()

Solves the problem. Returns True if the model was solved, False otherwise.

Source code in flowpaths/minerrorflow.py
def solve(self):
    """
    Solves the problem. Returns `True` if the model was solved, `False` otherwise.
    """
    utils.logger.info(f"{__name__}: solving with graph id = {utils.fpid(self.G)}")
    start_time = time.perf_counter()
    self.solver.optimize()
    self.solve_statistics[f"milp_solve_time"] = (time.perf_counter() - start_time)

    self.solve_statistics[f"milp_solver_status"] = self.solver.get_model_status()

    if self.solver.get_model_status() == "kOptimal":
        if self.different_flow_values_epsilon is None:
            self.__is_solved = True
            utils.logger.info(f"{__name__}: model solved with objective value = {self.solver.get_objective_value()}")
            return True
        else:
            # We need to encode the different flow values variant
            objective_value = self.solver.get_objective_value()

            # If the objective value is 0, then we can stop here
            # because we cannot get a different solution
            if objective_value == 0:
                self.__is_solved = True
                utils.logger.info(f"{__name__}: model solved with objective value = {objective_value}. We could not find change the flow values because the objective function was 0.")
                return True

            self.__is_solved = True # START hack to get the corrected graph                
            corrected_graph = self.get_corrected_graph()
            self.__is_solved = False # END hack to get the corrected graph

            # Pick 30 random edges of G.edges()
            edge_subset = [e for e in self.original_graph_copy.edges()]
            # edge_subset = edge_subset[:30]        

            # Getting all the different 'flow_attr' values in the corrected graph
            ub_different_flow_values = len(set(
                corrected_graph[u][v].get(self.flow_attr, 0)
                for (u, v) in edge_subset
            ))

            utils.logger.info(f"{__name__}: re-solving now by minimizing the number of different flow values within 1 + epsilon tolerance to the objective value, i.e. <=(1+{self.different_flow_values_epsilon})*{objective_value}")
            self.__create_solver()
            self.__encode_flow()
            self.__encode_different_flow_values_and_objective(
                edge_subset=edge_subset,
                objective_value=objective_value,
                ub_different_flow_values=ub_different_flow_values,
            )
            self.solver.optimize()
            self.solve_statistics[f"milp_solve_time"] += (time.perf_counter() - start_time)
            self.solve_statistics[f"milp_solver_status"] = self.solver.get_model_status()

            if self.solver.get_model_status() == "kOptimal":
                self.__is_solved = True
                utils.logger.info(f"{__name__}: model solved with objective value = {objective_value}")
                return True
            else:
                utils.logger.warning(f"{__name__}: model not solved, status = {self.solver.get_model_status()}")

    self.__is_solved = False
    return False