Minimum Correction of Weights to a Flow
Often, the edge weights of a graph are not a flow (i.e. do not satisfy flow conservation for non- source/sink nodes). While the models k-Minimum Path Error or k-Least Absolute Errors can decompose such graphs, as a less principled approach, one can first minimally correct the graph weights to become a flow, and then optimally decompose the resulting flow using the Minimum Flow Decomposition model.
This is faster in practice, because the Minimum Flow Decomposition solver is faster than the ones decomposing graphs without flow conservation. We are thus delegating error correction to a pre-processing step, and then avoiding the error-handling difficulty when decomposing the resulting graph.
1. Definition
This class solves the following problem.
-
INPUT: A directed graph \(G = (V,E)\) and a weight \(w(u,v)\) for every edge \((u,v)\) of \(G\). Weights do not necessarily satisfy flow conservation.
-
OUTPUT: A flow value \(f(u,v)\) for every edge \((u,v) \in E\), satisfying the flow conservation property for all non-source and non-sink nodes \(v\):
$$
\sum_{(u,v) \in E} f(u,v) = \sum_{(v,u) \in E} f(v,u),
$$
minimizing the sum of absolute errors:
$$
\sum_{(u,v) \in E}\Big|f(u,v) - w(u,v)\Big|.
$$
2. Examples
Let’s consider the following weighted graph, whose weights do not satisfy flow conservation, because the edges shown in brown are erroneous.
flowchart LR
s((s))
a((a))
b((b))
c((c))
d((d))
t((t))
s -->|7| a
a -->|2| b
s -->|7| b
a -->|4| c
b -->|9| c
c -->|7| d
d -->|6| t
c -->|7| t
linkStyle 0,5 stroke:brown,stroke-width:3;
Let’s then minimally correct its weights with default settings.
import flowpaths as fp
import networkx as nx
graph = nx.DiGraph()
graph.add_edge("s", "a", flow=7)
graph.add_edge("s", "b", flow=7)
graph.add_edge("a", "b", flow=2)
graph.add_edge("a", "c", flow=4)
graph.add_edge("b", "c", flow=9)
graph.add_edge("c", "d", flow=7)
graph.add_edge("c", "t", flow=7)
graph.add_edge("d", "t", flow=6)
# We create a the Minimum Error Flow solver with default settings
correction_model = fp.MinErrorFlow(graph, flow_attr="flow")
correction_model.solve()
The resulting graph is below. The edges whose weights have been corrected are in green.
flowchart LR
s((s))
a((a))
b((b))
c((c))
d((d))
t((t))
s -->|6| a
a -->|2| b
s -->|7| b
a -->|4| c
b -->|9| c
c -->|6| d
d -->|6| t
c -->|7| t
linkStyle 0,5 stroke:green,stroke-width:3;
Once we have corrected the weights, we can get the corrected graph and e.g. apply the minimum flow decomposition model on it.
if correction_model.is_solved():
corrected_graph = correction_model.get_corrected_graph()
# This is equivalent to: corrected_graph = correction_model.get_solution()["graph"]
mfd_model = fp.MinFlowDecomp(corrected_graph, flow_attr="flow")
mfd_model.solve()
if mfd_model.is_solved():
print(mfd_model.get_solution())
# {'paths': [
# ['s', 'b', 'c', 't'],
# ['s', 'a', 'c', 'd', 't'],
# ['s', 'a', 'b', 'c', 'd', 't']],
# 'weights': [7.0, 4.0, 2.0]}
This gives the following paths.
flowchart LR
s((s))
a((a))
b((b))
c((c))
d((d))
t((t))
s -->|4| a
a -->|4| c
c -->|4| d
d -->|4| t
linkStyle 0,1,2,3 stroke:red,stroke-width:3;
s -->|2| a
a -->|2| b
b -->|2| c
c -->|2| d
d -->|2| t
linkStyle 4,5,6,7,8 stroke:orange,stroke-width:3;
s -->|7| b
b -->|7| c
c -->|7| t
linkStyle 9,10,11 stroke:blue,stroke-width:3;
3. Generalizations
This class implements a more general version, as follows:
- For acyclic graphs, the corrected flow can start/end not only in source/sink nodes, but also in given sets of start/end nodes (set parameters
additional_starts
and additional_ends
). See also Additional start/end nodes.
- The error can count only for a given subset \(E' \subseteq E\) of the edges (set parameter
elements_to_ignore
to be \(E \setminus E'\)). See also ignoring edges documentation.
- The error (i.e. the above absolute of the difference) of every edge can contribute differently to the objective function, according to a scale factor \(\in [0,1]\). Set these via a dictionary that you pass to
error_scaling
, which stores the scale factor \(\lambda_{(u,v)} \in [0,1]\) of each edge \((u,v)\) in the dictionary. Setting \(\lambda_{(u,v)} = 0\) will add the edge \((u,v)\) to elements_to_ignore
, because the constraint for \((u,v)\) becomes always true. See also ignoring edges documentation.
- For acyclic graphs, one can also ensure some “sparsity” in the solution, meaning the total corrected flow exiting the source node is counts also in the minimization function, with a given multiplier \(\lambda\) (see ref. [2]). If \(\lambda = 0\), this has no effect.
Generalized objective function
Formally, the objective function generalized as in 2., 3. and 4. above is:
$$
\sum_{(u,v) \in E’}\lambda_{(u,v)} \cdot \Big|f(u,v) - w(u,v)\Big| + \lambda \sum_{(s,v) \in E} f(s,v).
$$
4. References
- Alexandru I. Tomescu, Anna Kuosmanen, Romeo Rizzi, Veli Mäkinen
A novel min-cost flow method for estimating transcript expression with RNA-Seq BMC Bioinformatics 14(S-5), S15, 2013 (preprint)
- Elsa Bernard, Laurent Jacob, Julien Mairal, Jean-Philippe Vert
Efficient RNA isoform identification and quantification from RNA-Seq data with network flows, Bioinformatics, Volume 30, Issue 17, September 2014, Pages 2447–2455
MinErrorFlow
MinErrorFlow(
G: DiGraph,
flow_attr: str,
flow_attr_origin: str = "edge",
weight_type: type = float,
sparsity_lambda: float = 0,
few_flow_values_epsilon: float = None,
elements_to_ignore: list = [],
error_scaling: dict = {},
additional_starts: list = [],
additional_ends: list = [],
solver_options: dict = {},
)
This class implements a method to optimally correct the weights of a directed acyclic graph, so that:
- The resulting weights become a flow, i.e. they become a non-negative flow, namely they satisfy the flow conservation constraints.
- The resulting weights are as close as possible to the original weights, i.e. the sum of the absolute difference between an edge weight and the corrected flow value of the edge, for all edges, is minimized.
Parameters
-
G: networkx.DiGraph
The directed graph to be corrected (which does not need to be acyclic).
-
flow_attr: str
The name of the attribute in the edges of the graph that contains the weight of the edge.
-
flow_attr_origin : str
, optional
The origin of the flow attribute. Default is "edge"
. Options:
"edge"
: the flow attribute is assumed to be on the edges of the graph.
"node"
: the flow attribute is assumed to be on the nodes of the graph. See the documentation on how node-weighted graphs are handled.
-
weight_type: type
, optional
The type of the weights of the edges. It can be either int
or float
. Default is float
.
-
sparsity_lambda: float
, optional
The sparsity parameter, used to control the trade-off between the sparsity of the solution and the closeness to the original weights. Default is 0
.
If sparsity_lambda
is set to 0
, then the solution will be as close as possible to the original weights. If sparsity_lambda
is set to a positive value, then the solution will be sparser, i.e. it will have less flow going out of the source.
The higher the value of sparsity_lambda
, the sparser the solution will be. You can use a value different than 0
only for acyclic graphs. If you set it to a value different than 0
for a cyclic graph, the class will raise an error.**
-
few_flow_values_epsilon: float
, optional
The epsilon value (at least zero) used to control the number of distinct values in the corrected flow. If few_flow_values_epsilon
is set to None
, then the solution will be as close as possible to the original weights,
and there is no bound on the number of distinct values in the corrected flow.
If few_flow_values_epsilon
is set to a positive value, then the solution will have fewer distinct flow values in the corrected flow, while ensuring that the objective value of the resulting problem is within \((1+arepsilon)\) of the optimal solution (with this parameter set to None
).
The higher the value of few_flow_values_epsilon
, the smaller the number of flow values in the corrected flow, but possibly higher the sum of edge errors in the corrected flow. Default is None
.
Warning
Setting this can be slow on larger graphs.
-
elements_to_ignore : list
, optional
List of edges (or nodes, if flow_attr_origin
is "node"
) to ignore when adding constrains on flow explanation by the weighted paths.
Default is an empty list. See ignoring edges documentation
-
error_scaling: dict
, optional
Dictionary edge: factor
(or node: factor
, if flow_attr_origin
is "node"
)) storing the error scale factor (in [0,1]) of every edge, which scale the allowed difference between edge/node weight and path weights.
Default is an empty dict. If an edge/node has a missing error scale factor, it is assumed to be 1. The factors are used to scale the
difference between the flow value of the edge/node and the sum of the weights of the paths going through the edge/node. See ignoring edges documentation
-
additional_starts: list
, optional
A list of nodes to be added as additional sources. This applies only to acyclic graphs. Flow is allowed to start start at these nodes, meaning that their out going flow can be greater than their incoming flow. Default is []
. See also additional start/end nodes documentation.
-
additional_ends: list
, optional
A list of nodes to be added as additional sinks. This applies only to acyclic graphs. Flow is allowed to end at these nodes, meaning that their incoming flow can be greater than their outgoing flow. Default is []
. See also additional start/end nodes documentation.
-
solver_options: dict
, optional
A dictionary containing the options for the solver. The options are passed to the solver wrapper. Default is {}
. See solver options documentation.
Source code in flowpaths/minerrorflow.py
| def __init__(
self,
G: nx.DiGraph,
flow_attr: str,
flow_attr_origin: str = "edge",
weight_type: type = float,
sparsity_lambda: float = 0,
few_flow_values_epsilon: float = None,
elements_to_ignore: list = [],
error_scaling: dict = {},
additional_starts: list = [],
additional_ends: list = [],
solver_options: dict = {},
):
"""
This class implements a method to optimally correct the weights of a directed acyclic graph, so that:
- The resulting weights become a flow, i.e. they become a non-negative flow, namely they satisfy the flow conservation constraints.
- The resulting weights are as close as possible to the original weights, i.e. the sum of the absolute difference between an edge weight and the corrected flow value of the edge, for all edges, is minimized.
Parameters
----------
- `G: networkx.DiGraph`
The directed graph to be corrected (which does not need to be acyclic).
- `flow_attr: str`
The name of the attribute in the edges of the graph that contains the weight of the edge.
- `flow_attr_origin : str`, optional
The origin of the flow attribute. Default is `"edge"`. Options:
- `"edge"`: the flow attribute is assumed to be on the edges of the graph.
- `"node"`: the flow attribute is assumed to be on the nodes of the graph. See [the documentation](node-expanded-digraph.md) on how node-weighted graphs are handled.
- `weight_type: type`, optional
The type of the weights of the edges. It can be either `int` or `float`. Default is `float`.
- `sparsity_lambda: float`, optional
The sparsity parameter, used to control the trade-off between the sparsity of the solution and the closeness to the original weights. Default is `0`.
If `sparsity_lambda` is set to `0`, then the solution will be as close as possible to the original weights. If `sparsity_lambda` is set to a positive value, then the solution will be sparser, i.e. it will have less flow going out of the source.
The higher the value of `sparsity_lambda`, the sparser the solution will be. **You can use a value different than `0` only for acyclic graphs.** If you set it to a value different than `0` for a cyclic graph, the class will raise an error.**
- `few_flow_values_epsilon: float`, optional
The epsilon value (at least zero) used to control the number of distinct values in the corrected flow. If `few_flow_values_epsilon` is set to `None`, then the solution will be as close as possible to the original weights,
and there is no bound on the number of distinct values in the corrected flow.
If `few_flow_values_epsilon` is set to a positive value, then the solution will have fewer distinct flow values in the corrected flow, while ensuring that the objective value of the resulting problem is within $(1+\varepsilon)$ of the optimal solution (with this parameter set to `None`).
The higher the value of `few_flow_values_epsilon`, the smaller the number of flow values in the corrected flow, but possibly higher the sum of edge errors in the corrected flow. Default is `None`.
!!! warning "Warning"
Setting this can be slow on larger graphs.
- `elements_to_ignore : list`, optional
List of edges (or nodes, if `flow_attr_origin` is `"node"`) to ignore when adding constrains on flow explanation by the weighted paths.
Default is an empty list. See [ignoring edges documentation](ignoring-edges.md)
- `error_scaling: dict`, optional
Dictionary `edge: factor` (or `node: factor`, if `flow_attr_origin` is `"node"`)) storing the error scale factor (in [0,1]) of every edge, which scale the allowed difference between edge/node weight and path weights.
Default is an empty dict. If an edge/node has a missing error scale factor, it is assumed to be 1. The factors are used to scale the
difference between the flow value of the edge/node and the sum of the weights of the paths going through the edge/node. See [ignoring edges documentation](ignoring-edges.md)
- `additional_starts: list`, optional
A list of nodes to be added as additional sources. **This applies only to acyclic graphs.** Flow is allowed to start start at these nodes, meaning that their out going flow can be greater than their incoming flow. Default is `[]`. See also [additional start/end nodes documentation](additional-start-end-nodes.md).
- `additional_ends: list`, optional
A list of nodes to be added as additional sinks. **This applies only to acyclic graphs.** Flow is allowed to end at these nodes, meaning that their incoming flow can be greater than their outgoing flow. Default is `[]`. See also [additional start/end nodes documentation](additional-start-end-nodes.md).
- `solver_options: dict`, optional
A dictionary containing the options for the solver. The options are passed to the solver wrapper. Default is `{}`. See [solver options documentation](solver-options-optimizations.md).
"""
# Handling node-weighted graphs
self.flow_attr_origin = flow_attr_origin
if self.flow_attr_origin == "node":
self.G_internal = nedg.NodeExpandedDiGraph(G, node_flow_attr=flow_attr)
additional_starts_internal = self.G_internal.get_expanded_additional_starts(additional_starts)
additional_ends_internal = self.G_internal.get_expanded_additional_ends(additional_ends)
edges_to_ignore_internal = self.G_internal.edges_to_ignore
if not all(isinstance(node, str) for node in elements_to_ignore):
utils.logger.error(f"elements_to_ignore must be a list of nodes, i.e. strings, not {elements_to_ignore}")
raise ValueError(f"elements_to_ignore must be a list of nodes, i.e. strings, not {elements_to_ignore}")
edges_to_ignore_internal += [self.G_internal.get_expanded_edge(node) for node in elements_to_ignore]
edges_to_ignore_internal = list(set(edges_to_ignore_internal))
error_scaling_internal = {self.G_internal.get_expanded_edge(node): error_scaling[node] for node in error_scaling}
elif self.flow_attr_origin == "edge":
self.G_internal = G
additional_starts_internal = additional_starts
additional_ends_internal = additional_ends
if not all(isinstance(edge, tuple) and len(edge) == 2 for edge in elements_to_ignore):
utils.logger.error(f"elements_to_ignore must be a list of edges (i.e. tuples of nodes), not {elements_to_ignore}")
raise ValueError(f"elements_to_ignore must be a list of edges (i.e. tuples of nodes), not {elements_to_ignore}")
edges_to_ignore_internal = elements_to_ignore
error_scaling_internal = error_scaling
else:
utils.logger.error(f"flow_attr_origin must be either 'node' or 'edge', not {self.flow_attr_origin}")
raise ValueError(f"flow_attr_origin must be either 'node' or 'edge', not {self.flow_attr_origin}")
self.original_graph_copy = deepcopy(self.G_internal)
self.sparsity_lambda = sparsity_lambda
if nx.is_directed_acyclic_graph(self.G_internal):
self.is_acyclic = True
self.G = stdigraph.stDiGraph(self.G_internal, additional_starts=additional_starts_internal, additional_ends=additional_ends_internal)
self.edges_to_ignore = set(edges_to_ignore_internal).union(self.G.source_sink_edges)
else:
self.G = self.G_internal
self.is_acyclic = False
self.edges_to_ignore = set(edges_to_ignore_internal)
if self.sparsity_lambda != 0:
utils.logger.error(f"{__name__}: You cannot set sparsity_lambda != 0 for a graph with cycles.")
raise ValueError(f"You cannot set sparsity_lambda != 0 for a graph with cycles.")
self.edge_error_scaling = error_scaling_internal
# If the error scaling factor is 0, we ignore the edge
self.edges_to_ignore |= {edge for edge, factor in self.edge_error_scaling.items() if factor == 0}
# Checking that every entry in self.error_scaling is between 0 and 1
for key, value in error_scaling.items():
if value < 0 or value > 1:
utils.logger.error(f"{__name__}: Error scaling factor for {key} must be between 0 and 1.")
raise ValueError(f"Error scaling factor for {key} must be between 0 and 1.")
self.flow_attr = flow_attr
if weight_type not in [int, float]:
utils.logger.error(f"{__name__}: weight_type must be either int or float, not {weight_type}")
raise ValueError(f"weight_type must be either int or float, not {weight_type}")
self.weight_type = weight_type
self.solver_options = solver_options
# Checking that every entry in self.edge_error_scaling is between 0 and 1
for key, value in self.edge_error_scaling.items():
if value < 0 or value > 1:
utils.logger.error(f"{__name__}: Error scaling factor for {key} must be between 0 and 1.")
raise ValueError(f"Error scaling factor for {key} must be between 0 and 1.")
self.different_flow_values_epsilon = few_flow_values_epsilon
if few_flow_values_epsilon is not None:
if few_flow_values_epsilon < 0:
utils.logger.error(f"{__name__}: different_flow_values_epsilon must be greater than or equal to 0, not {few_flow_values_epsilon}")
raise ValueError(f"different_flow_values_epsilon must be greater than or equal to 0, not {few_flow_values_epsilon}")
if few_flow_values_epsilon == 0:
self.different_flow_values_epsilon = None
self._solution = None
self._is_solved = None
self.solve_statistics = dict()
self.edge_vars = {}
self.edge_error_vars = {}
self.edge_sol = {}
self.w_max = max(
[
self.G[u][v].get(self.flow_attr, 0)
for (u, v) in self.G.edges()
]
)
self.ub = self.w_max * self.G.number_of_edges()
self._create_solver()
self._encode_flow()
self._encode_min_sum_errors_objective()
utils.logger.info(f"{__name__}: initialized with graph id = {utils.fpid(G)}")
|
get_corrected_graph
Returns the corrected graph, as a networkx DiGraph. This is a deep copy of the original graph, but having the corrected weights.
Warning
Call the solve
method first.
Source code in flowpaths/minerrorflow.py
| def get_corrected_graph(self):
"""
Returns the corrected graph, as a networkx DiGraph. This is a deep copy of the original graph, but having the corrected weights.
!!! warning "Warning"
Call the `solve` method first.
"""
solution = self.get_solution()
return solution["graph"]
|
get_objective_value
Returns the sum of the errors of the optimum solution.
Warning
Call the solve
method first.
Source code in flowpaths/minerrorflow.py
| def get_objective_value(self):
"""
Returns the sum of the errors of the optimum solution.
!!! warning "Warning"
Call the `solve` method first.
"""
solution = self.get_solution()
return solution["error"]
|
get_solution
Returns the solution to the problem, if the model was solved, as a dictionary containing the following keys:
graph
: the corrected graph, as a networkx DiGraph.
error
: the error of the solution, i.e. the sum of the absolute differences between the original weights and the corrected weights.
objective_value
: the value of the objective function.
Warning
Call the solve
method first.
Source code in flowpaths/minerrorflow.py
| def get_solution(self):
"""
Returns the solution to the problem, if the model was solved, as a dictionary containing the following keys:
- `graph`: the corrected graph, as a networkx DiGraph.
- `error`: the error of the solution, i.e. the sum of the absolute differences between the original weights and the corrected weights.
- `objective_value`: the value of the objective function.
!!! warning "Warning"
Call the `solve` method first.
"""
if self._solution is not None:
return self._solution
self._check_is_solved()
edge_sol_dict = self.solver.get_variable_values("edge_vars", [str, str])
for edge in edge_sol_dict.keys():
self.edge_sol[edge] = (
round(edge_sol_dict[edge])
if self.weight_type == int
else float(edge_sol_dict[edge])
)
edge_error_sol_dict = self.solver.get_variable_values("edge_error_vars", [str, str])
error = sum(edge_error_sol_dict.values())
corrected_graph = deepcopy(self.original_graph_copy)
for u, v in corrected_graph.edges():
if self.flow_attr in corrected_graph[u][v]:
corrected_graph[u][v][self.flow_attr] = self.edge_sol[(u, v)]
if self.flow_attr_origin == "edge":
self._solution = {
"graph": corrected_graph,
"error": error,
"objective_value": self.solver.get_objective_value(),
}
elif self.flow_attr_origin == "node":
self._solution = {
"graph": corrected_graph.get_condensed_graph(),
"error": error,
"objective_value": self.solver.get_objective_value(),
}
return self._solution
|
is_solved
Returns True
if the model was solved, False
otherwise.
Source code in flowpaths/minerrorflow.py
| def is_solved(self):
"""
Returns `True` if the model was solved, `False` otherwise.
"""
return self._is_solved
|
solve
Solves the problem. Returns True
if the model was solved, False
otherwise.
Source code in flowpaths/minerrorflow.py
| def solve(self):
"""
Solves the problem. Returns `True` if the model was solved, `False` otherwise.
"""
utils.logger.info(f"{__name__}: solving with graph id = {utils.fpid(self.G)}")
start_time = time.perf_counter()
self.solver.optimize()
self.solve_statistics[f"milp_solve_time"] = (time.perf_counter() - start_time)
self.solve_statistics[f"milp_solver_status"] = self.solver.get_model_status()
if self.solver.get_model_status() == "kOptimal":
if self.different_flow_values_epsilon is None:
self._is_solved = True
utils.logger.info(f"{__name__}: model solved with objective value = {self.solver.get_objective_value()}")
return True
else:
# We need to encode the different flow values variant
objective_value = self.solver.get_objective_value()
# If the objective value is 0, then we can stop here
# because we cannot get a different solution
if objective_value == 0:
self._is_solved = True
utils.logger.info(f"{__name__}: model solved with objective value = {objective_value}. We could not find change the flow values because the objective function was 0.")
return True
self._is_solved = True # START hack to get the corrected graph
corrected_graph = self.get_corrected_graph()
self._is_solved = False # END hack to get the corrected graph
# Pick 30 random edges of G.edges()
edge_subset = [e for e in self.original_graph_copy.edges()]
# edge_subset = edge_subset[:30]
# Getting all the different 'flow_attr' values in the corrected graph
ub_different_flow_values = len(set(
corrected_graph[u][v].get(self.flow_attr, 0)
for (u, v) in edge_subset
))
utils.logger.info(f"{__name__}: re-solving now by minimizing the number of different flow values within 1 + epsilon tolerance to the objective value, i.e. <=(1+{self.different_flow_values_epsilon})*{objective_value}")
self._create_solver()
self._encode_flow()
self._encode_different_flow_values_and_objective(
edge_subset=edge_subset,
objective_value=objective_value,
ub_different_flow_values=ub_different_flow_values,
)
self.solver.optimize()
self.solve_statistics[f"milp_solve_time"] += (time.perf_counter() - start_time)
self.solve_statistics[f"milp_solver_status"] = self.solver.get_model_status()
if self.solver.get_model_status() == "kOptimal":
self._is_solved = True
utils.logger.info(f"{__name__}: model solved with objective value = {objective_value}")
return True
else:
utils.logger.warning(f"{__name__}: model not solved, status = {self.solver.get_model_status()}")
self._is_solved = False
return False
|