k-Minimum Discordant Nodes
The k-Minimum Discordant Nodes model is inspired by the transcript-assembly formulation introduced by Zhao et al. (MultiTrans, 2022). Given node flow values and a tolerance parameter, it finds \(k\) weighted source-to-sink paths whose induced node flow explanations minimize the number of nodes that are discordant.
1. Definition
The k-Minimum Discordant Nodes problem on a directed acyclic graph (DAG) is defined as follows.
-
INPUT:
- A directed graph \(G = (V,E)\) with non-negative observed flow values \(f(v)\) on nodes.
- A tolerance value \(\tau \geq 0\).
- \(k \in \mathbb{Z}_+\).
-
OUTPUT: A list of \(k\) source-to-sink paths \(P_1,\dots,P_k\) with associated non-negative weights \(w_1,\dots,w_k\), minimizing the number of discordant nodes.
For each node \(v\), define the explained flow
$$
\widehat{f}(v) = \sum_{i : v \in P_i} w_i.
$$
Node \(v\) is discordant if
$$
\widehat{f}(v) \notin \left[(1-\tau)f(v),\ (1+\tau)f(v)\right].
$$
The objective is to minimize the number of discordant nodes.
Note
- This model takes node flow values as input (
flow_attr on nodes) and uses the node-expanded graph construction internally.
- The graph may have multiple sources and sinks.
For example, in the graph below, all node flow values are shown on the nodes.
flowchart LR
s((s:5))
a((a:10))
b((b:5))
t((t:5))
s --> a
a --> b
b --> t
With \(k=1\), no single weighted path can explain all node flows exactly, so at least one node is discordant. With a larger \(k\), discordance can decrease or remain the same.
2. Solving the problem
import flowpaths as fp
import networkx as nx
G = nx.DiGraph()
G.add_node("s", flow=5)
G.add_node("a", flow=10)
G.add_node("b", flow=5)
G.add_node("t", flow=5)
G.add_edge("s", "a")
G.add_edge("a", "b")
G.add_edge("b", "t")
model = fp.kMinDiscordantNodes(
G=G,
flow_attr="flow",
k=2,
discordance_tolerance=0.1,
)
model.solve()
if model.is_solved():
sol = model.get_solution()
print(sol["paths"])
print(sol["weights"])
print(sol["discordant_nodes"])
3. References
-
Jin Zhao, Haodi Feng, Daming Zhu, Yu Lin,
MultiTrans: An Algorithm for Path Extraction Through Mixed Integer Linear Programming for Transcriptome Assembly,
IEEE/ACM Transactions on Computational Biology and Bioinformatics 19(1), 48-56, 2022.
-
See also flowpaths References.
kMinDiscordantNodes
kMinDiscordantNodes(
G: DiGraph,
flow_attr: str,
k: int,
weight_type: type = float,
discordance_tolerance: float = 0.1,
subpath_constraints: list = [],
additional_starts: list = [],
additional_ends: list = [],
optimization_options: dict = None,
solver_options: dict = {},
)
Bases: AbstractPathModelDAG
This class implements the k-MinDiscordantNodes problem, namely it looks for a decomposition of a weighted DAG into
\(k\) weighted paths, minimizing the number of discordant nodes. A node is discordant
if the sum of the weights of the solution paths traversing it lies outside the interval
[(1 - discordance_tolerance) * observed_flow,
(1 + discordance_tolerance) * observed_flow].
Additionally, it is required that every node and every edge appear in some solution path.
Parameters
-
G: nx.DiGraph
The input directed acyclic graph, as networkx DiGraph.
-
k: int
The number of paths to decompose in.
-
weight_type: int | float, optional
The type of the weights and slacks (int or float). Default is float.
-
subpath_constraints: list, optional
List of subpath constraints. Default is an empty list.
Each subpath constraint is a list of edges that must be covered by some solution path, according
to the subpath_constraints_coverage or subpath_constraints_coverage_length parameters (see below).
-
additional_starts: list, optional
List of additional start nodes of the paths. Default is an empty list.
-
additional_ends: list, optional
List of additional end nodes of the paths. Default is an empty list.
-
optimization_options: dict, optional
Dictionary with the optimization options. Default is None. See optimization options documentation.
-
solver_options: dict, optional
Dictionary with the solver options. Default is {}. See solver options documentation.
Raises
-
ValueError
- If
weight_type is not int or float.
- If the edge error scaling factor is not in [0,1].
- If the flow attribute
flow_attr is not specified in some edge.
- If the graph contains edges with negative flow values.
Source code in flowpaths/kmindiscordantnodes.py
| def __init__(
self,
G: nx.DiGraph,
flow_attr: str,
k: int,
weight_type: type = float,
discordance_tolerance: float = 0.1,
subpath_constraints: list = [],
additional_starts: list = [],
additional_ends: list = [],
optimization_options: dict = None,
solver_options: dict = {},
):
"""
This class implements the k-MinDiscordantNodes problem, namely it looks for a decomposition of a weighted DAG into
$k$ weighted paths, minimizing the number of *discordant* nodes. A node is *discordant*
if the sum of the weights of the solution paths traversing it lies outside the interval
[(1 - discordance_tolerance) * observed_flow,
(1 + discordance_tolerance) * observed_flow].
Additionally, it is required that every node and every edge appear in some solution path.
Parameters
----------
- `G: nx.DiGraph`
The input directed acyclic graph, as [networkx DiGraph](https://networkx.org/documentation/stable/reference/classes/digraph.html).
- `k: int`
The number of paths to decompose in.
- `weight_type: int | float`, optional
The type of the weights and slacks (`int` or `float`). Default is `float`.
- `subpath_constraints: list`, optional
List of subpath constraints. Default is an empty list.
Each subpath constraint is a list of edges that must be covered by some solution path, according
to the `subpath_constraints_coverage` or `subpath_constraints_coverage_length` parameters (see below).
- `additional_starts: list`, optional
List of additional start nodes of the paths. Default is an empty list.
- `additional_ends: list`, optional
List of additional end nodes of the paths. Default is an empty list.
- `optimization_options: dict`, optional
Dictionary with the optimization options. Default is `None`. See [optimization options documentation](solver-options-optimizations.md).
- `solver_options: dict`, optional
Dictionary with the solver options. Default is `{}`. See [solver options documentation](solver-options-optimizations.md).
Raises
------
- `ValueError`
- If `weight_type` is not `int` or `float`.
- If the edge error scaling factor is not in [0,1].
- If the flow attribute `flow_attr` is not specified in some edge.
- If the graph contains edges with negative flow values.
"""
utils.logger.info(f"{__name__}: START initialized with graph id = {utils.fpid(G)}, k = {k}")
# Handling node-weighted graphs
if G.number_of_nodes() == 0:
utils.logger.error(f"{__name__}: The input graph G has no nodes. Please provide a graph with at least one node.")
raise ValueError(f"The input graph G has no nodes. Please provide a graph with at least one node.")
self.G_internal = nedg.NodeExpandedDiGraph(G, node_flow_attr=flow_attr)
subpath_constraints_internal = self.G_internal.get_expanded_subpath_constraints(subpath_constraints)
additional_starts_internal = self.G_internal.get_expanded_additional_starts(additional_starts)
additional_ends_internal = self.G_internal.get_expanded_additional_ends(additional_ends)
edges_to_ignore_internal = self.G_internal.edges_to_ignore
self.G = stdag.stDAG(self.G_internal, additional_starts=additional_starts_internal, additional_ends=additional_ends_internal)
self.subpath_constraints = subpath_constraints_internal
self.edges_to_ignore = self.G.source_sink_edges.union(edges_to_ignore_internal)
# Add nodes and edges are required to be covered by the solution paths,
# thus all edges of the NodeExpanded Digraphs are trusted for safety
self.trusted_edges_for_safety = self.G_internal.edges()
if weight_type not in [int, float]:
utils.logger.error(f"{__name__}: weight_type must be either int or float, not {weight_type}")
raise ValueError(f"weight_type must be either int or float, not {weight_type}")
self.weight_type = weight_type
self.k = k
self.original_k = k
self.optimization_options = optimization_options or {}
self.discordance_tolerance = discordance_tolerance
self.flow_attr = flow_attr
self.w_max = self.k * self.weight_type(
self.G.get_max_flow_value_and_check_non_negative_flow(
flow_attr=self.flow_attr, edges_to_ignore=self.edges_to_ignore
)
)
self.pi_vars = {}
self.path_weights_vars = {}
self.discordant_edge_vars = {}
self.discordant_low_vars = {}
self.discordant_high_vars = {}
self.path_weights_sol = None
self.discordant_edges_sol = None
self.discordant_nodes_sol = None
self._solution = None
self._lowerbound_k = None
self.solve_statistics = {}
self.optimization_options["trusted_edges_for_safety"] = set(self.trusted_edges_for_safety)
# Call the constructor of the parent class AbstractPathModelDAG
super().__init__(
self.G,
self.k,
subpath_constraints=self.subpath_constraints,
optimization_options=self.optimization_options,
solver_options=solver_options,
solve_statistics=self.solve_statistics
)
# This method is called from the super class AbstractPathModelDAG
self.create_solver_and_paths()
# This method is called from the current class to encode the problem
self._encode_discordance_decomposition()
# This method is called from the current class to encode that every node and edge is covered by at least one path
self._encode_cover_every_node_edge_constraints()
# This method is called from the current class to add the objective function
self._encode_objective()
utils.logger.info(f"{__name__}: END initialized with graph id = {utils.fpid(G)}, k = {self.k}")
|
get_solution
get_solution(
remove_empty_paths=True,
)
Retrieves the solution for the flow decomposition problem.
If the solution has already been computed and cached as self.solution, it returns the cached solution.
Otherwise, it checks if the problem has been solved, computes the solution paths, weights, slacks
and caches the solution.
Returns
-
solution: dict
A dictionary containing the solution paths (key "paths") and their corresponding
weights (key "weights"), and binary discordance indicators in the
original graph (key "discordant_nodes", keyed by original nodes).
Raises
exception If model is not solved.
Source code in flowpaths/kmindiscordantnodes.py
| def get_solution(self, remove_empty_paths=True):
"""
Retrieves the solution for the flow decomposition problem.
If the solution has already been computed and cached as `self.solution`, it returns the cached solution.
Otherwise, it checks if the problem has been solved, computes the solution paths, weights, slacks
and caches the solution.
Returns
-------
- `solution: dict`
A dictionary containing the solution paths (key `"paths"`) and their corresponding
weights (key `"weights"`), and binary discordance indicators in the
original graph (key `"discordant_nodes"`, keyed by original nodes).
Raises
-------
- `exception` If model is not solved.
"""
if self._solution is not None:
return self._remove_empty_paths(self._solution) if remove_empty_paths else self._solution
self.check_is_solved()
weights_sol_dict = self.solver.get_values(self.path_weights_vars)
self.path_weights_sol = [
(
round(weights_sol_dict[i])
if self.weight_type == int
else float(weights_sol_dict[i])
)
for i in range(self.k)
]
self.discordant_edges_sol = self.solver.get_values(self.discordant_edge_vars)
for (u, v) in self.edge_indexes_basic:
self.discordant_edges_sol[(u, v)] = int(round(self.discordant_edges_sol[(u, v)]))
self.discordant_nodes_sol = {}
for expanded_edge, z_value in self.discordant_edges_sol.items():
condensed_node = self.G_internal.get_condensed_node(expanded_edge)
if condensed_node is not None:
self.discordant_nodes_sol[condensed_node] = z_value
else:
utils.logger.error(
f"{__name__}: Error: expanded edge {expanded_edge} does not correspond to any condensed node. This should not happen."
)
self._solution = {
"_paths_internal": self.get_solution_paths(),
"paths": self.G_internal.get_condensed_paths(self.get_solution_paths()),
"weights": self.path_weights_sol,
"discordant_nodes": self.discordant_nodes_sol, # Keys are original graph nodes, values are in {0,1}.
}
return self._remove_empty_paths(self._solution) if remove_empty_paths else self._solution
|
is_valid_solution
is_valid_solution(
tolerance=0.001,
)
Checks if the solution is valid by comparing the flow from paths with the flow attribute in the graph edges.
Raises
- ValueError: If the solution is not available (i.e., self.solution is None).
Returns
- bool: True if the solution is valid, False otherwise.
Notes
get_solution() must be called before this method.
- The solution is considered valid if the flow from paths is equal
(up to
TOLERANCE * num_paths_on_edges[(u, v)]) to the flow value of the graph edges.
Source code in flowpaths/kmindiscordantnodes.py
| def is_valid_solution(self, tolerance=0.001):
"""
Checks if the solution is valid by comparing the flow from paths with the flow attribute in the graph edges.
Raises
------
- ValueError: If the solution is not available (i.e., self.solution is None).
Returns
-------
- bool: True if the solution is valid, False otherwise.
Notes
-------
- `get_solution()` must be called before this method.
- The solution is considered valid if the flow from paths is equal
(up to `TOLERANCE * num_paths_on_edges[(u, v)]`) to the flow value of the graph edges.
"""
if self._solution is None:
self.get_solution()
solution_paths = self._solution.get("_paths_internal", self._solution["paths"])
solution_weights = self._solution["weights"]
if self.discordant_edges_sol is None:
self.discordant_edges_sol = self.solver.get_values(self.discordant_edge_vars)
for (u, v) in self.edge_indexes_basic:
self.discordant_edges_sol[(u, v)] = int(round(self.discordant_edges_sol[(u, v)]))
solution_discordance_expanded = self.discordant_edges_sol
solution_paths_of_edges = [
[(path[i], path[i + 1]) for i in range(len(path) - 1)]
for path in solution_paths
]
weight_from_paths = {(u, v): 0 for (u, v) in self.G.edges()}
num_paths_on_edges = {e: 0 for e in self.G.edges()}
for weight, path in zip(solution_weights, solution_paths_of_edges):
for e in path:
weight_from_paths[e] += weight
num_paths_on_edges[e] += 1
for u, v, data in self.G.edges(data=True):
if self.flow_attr in data and (u,v) not in self.edges_to_ignore:
interval_lb = (1 - self.discordance_tolerance) * data[self.flow_attr]
interval_ub = (1 + self.discordance_tolerance) * data[self.flow_attr]
inside_interval = interval_lb - tolerance <= weight_from_paths[(u, v)] <= interval_ub + tolerance
edge_z = int(round(solution_discordance_expanded[(u, v)]))
if edge_z == 0 and not inside_interval:
utils.logger.debug(
f"{__name__}: Invalid solution for expanded-graph node item ({u}, {v}): z=0 but "
f"weight from paths {weight_from_paths[(u, v)]} is outside "
f"[{interval_lb}, {interval_ub}]"
)
return False
if edge_z == 1 and inside_interval:
utils.logger.debug(
f"{__name__}: Invalid solution for expanded-graph node item ({u}, {v}): z=1 but "
f"weight from paths {weight_from_paths[(u, v)]} is inside "
f"[{interval_lb}, {interval_ub}]"
)
return False
if abs(self.get_objective_value() - self.solver.get_objective_value()) > tolerance * self.original_k:
utils.logger.debug(
f"{__name__}: Invalid solution: objective value {self.get_objective_value()} != solver objective value {self.solver.get_objective_value()} (tolerance: {tolerance * self.original_k})"
)
return False
return True
|