Skip to content

k-Minimum Discordant Nodes in General Graphs

This is the cyclic-graph variant of k-Minimum Discordant Nodes, inspired by the transcript-assembly formulation introduced by Zhao et al. (MultiTrans, 2022). It finds \(k\) weighted source-to-sink walks that minimize the number of discordant nodes.

1. Definition

The k-Minimum Discordant Nodes problem on a directed graph, possibly with cycles, is defined as follows.

  • INPUT:

    • A directed graph \(G = (V,E)\) with non-negative observed flow values \(f(v)\) on nodes.
    • A tolerance value \(\tau \geq 0\).
    • \(k \in \mathbb{Z}_+\).
  • OUTPUT: A list of \(k\) walks \(W_1,\dots,W_k\) and corresponding non-negative weights \(w_1,\dots,w_k\), minimizing the number of discordant nodes.

For each node \(v\), let \(W_i(v)\) denote the number of occurrences of \(v\) in walk \(W_i\). The explained flow at node \(v\) is $$ \widehat{f}(v) = \sum_{i=1}^{k} w_i \cdot W_i(v). $$ Node \(v\) is discordant if $$ \widehat{f}(v) \notin \left[(1-\tau)f(v),\ (1+\tau)f(v)\right]. $$ The objective is to minimize the number of discordant nodes.

For example, a cyclic input graph can be:

flowchart LR
    s((s:5))
    a((a:5))
    b((b:5))
    t((t:5))
    s --> a
    a --> b
    b --> a
    b --> t

2. Solving the problem

import flowpaths as fp
import networkx as nx

G = nx.DiGraph()
G.add_node("s", flow=5)
G.add_node("a", flow=5)
G.add_node("b", flow=5)
G.add_node("t", flow=5)
G.add_edge("s", "a")
G.add_edge("a", "b")
G.add_edge("b", "a")
G.add_edge("b", "t")

model = fp.kMinDiscordantNodesCycles(
    G=G,
    flow_attr="flow",
    k=2,
    discordance_tolerance=0.1,
)
model.solve()

if model.is_solved():
    sol = model.get_solution(remove_empty_walks=True)
    print(sol["walks"])
    print(sol["weights"])
    print(sol["discordant_nodes"])

3. References

  1. Jin Zhao, Haodi Feng, Daming Zhu, Yu Lin, MultiTrans: An Algorithm for Path Extraction Through Mixed Integer Linear Programming for Transcriptome Assembly, IEEE/ACM Transactions on Computational Biology and Bioinformatics 19(1), 48-56, 2022.

  2. See also flowpaths References.

kMinDiscordantNodesCycles

kMinDiscordantNodesCycles(
    G: DiGraph,
    flow_attr: str,
    k: int,
    weight_type: type = float,
    discordance_tolerance: float = 0.1,
    subset_constraints: list = [],
    additional_starts: list = [],
    additional_ends: list = [],
    optimization_options: dict = None,
    solver_options: dict = {},
)

Bases: AbstractWalkModelDiGraph

This class implements the k-MinDiscordantNodes problem on general directed graphs, possibly with cycles. It looks for a decomposition of a weighted graph into \(k\) weighted walks, minimizing the number of discordant nodes. A node is discordant if the sum of the walk-weights traversing it lies outside the interval

[(1 - discordance_tolerance) * observed_flow,
 (1 + discordance_tolerance) * observed_flow].

Additionally, it is required that every node and every edge appear in some solution walk.

Parameters

  • G: nx.DiGraph

    The input directed graph, as networkx DiGraph, which can have cycles.

  • k: int

    The number of walks to decompose in.

  • weight_type: int | float, optional

    The type of the weights (int or float). Default is float.

  • subset_constraints: list, optional

    List of subset constraints. Default is an empty list.

    In this node-weighted model, constraints are expected on original graph nodes, i.e. each constraint is a list/set of node names that must be covered by some solution walk (order-independent, set semantics).

    These are converted to expanded-graph edges via NodeExpandedDiGraph (node v becomes expanded edge (v.0, v.1)) before encoding.

  • additional_starts: list, optional

    List of additional start nodes of the walks. Default is an empty list.

  • additional_ends: list, optional

    List of additional end nodes of the walks. Default is an empty list.

  • optimization_options: dict, optional

    Dictionary with optimization options. Default is None.

  • solver_options: dict, optional

    Dictionary with solver options. Default is {}.

Raises

  • ValueError

    • If weight_type is not int or float.
    • If the flow attribute flow_attr is not specified in some edge.
    • If the graph contains edges with negative flow values.
Source code in flowpaths/kmindiscordantnodescycles.py
def __init__(
    self,
    G: nx.DiGraph,
    flow_attr: str,
    k: int,
    weight_type: type = float,
    discordance_tolerance: float = 0.1,
    subset_constraints: list = [],
    additional_starts: list = [],
    additional_ends: list = [],
    optimization_options: dict = None,
    solver_options: dict = {},
):
    """
    This class implements the k-MinDiscordantNodes problem on general directed graphs,
    possibly with cycles. It looks for a decomposition of a weighted graph into
    $k$ weighted walks, minimizing the number of *discordant* nodes. A node is
    *discordant* if the sum of the walk-weights traversing it lies outside the interval

        [(1 - discordance_tolerance) * observed_flow,
         (1 + discordance_tolerance) * observed_flow].

    Additionally, it is required that every node and every edge appear in some
    solution walk.

    Parameters
    ----------
    - `G: nx.DiGraph`

        The input directed graph, as [networkx DiGraph](https://networkx.org/documentation/stable/reference/classes/digraph.html),
        which can have cycles.

    - `k: int`

        The number of walks to decompose in.

    - `weight_type: int | float`, optional

        The type of the weights (`int` or `float`). Default is `float`.

    - `subset_constraints: list`, optional

        List of subset constraints. Default is an empty list.

        In this node-weighted model, constraints are expected on original graph nodes,
        i.e. each constraint is a list/set of node names that must be covered by some
        solution walk (order-independent, set semantics).

        These are converted to expanded-graph edges via `NodeExpandedDiGraph`
        (node `v` becomes expanded edge `(v.0, v.1)`) before encoding.

    - `additional_starts: list`, optional

        List of additional start nodes of the walks. Default is an empty list.

    - `additional_ends: list`, optional

        List of additional end nodes of the walks. Default is an empty list.

    - `optimization_options: dict`, optional

        Dictionary with optimization options. Default is `None`.

    - `solver_options: dict`, optional

        Dictionary with solver options. Default is `{}`.

    Raises
    ------
    - `ValueError`

        - If `weight_type` is not `int` or `float`.
        - If the flow attribute `flow_attr` is not specified in some edge.
        - If the graph contains edges with negative flow values.
    """

    utils.logger.info(f"{__name__}: START initialized with graph id = {utils.fpid(G)}, k = {k}")

    if G.number_of_nodes() == 0:
        utils.logger.error(f"{__name__}: The input graph G has no nodes. Please provide a graph with at least one node.")
        raise ValueError("The input graph G has no nodes. Please provide a graph with at least one node.")

    self.G_internal = nedg.NodeExpandedDiGraph(G, node_flow_attr=flow_attr)
    subset_constraints_expanded = self.G_internal.get_expanded_subpath_constraints(subset_constraints)
    additional_starts_internal = self.G_internal.get_expanded_additional_starts(additional_starts)
    additional_ends_internal = self.G_internal.get_expanded_additional_ends(additional_ends)

    edges_to_ignore_internal = self.G_internal.edges_to_ignore

    self.G = stdigraph.stDiGraph(
        self.G_internal,
        additional_starts=additional_starts_internal,
        additional_ends=additional_ends_internal,
    )
    self.subset_constraints = subset_constraints_expanded
    self.edges_to_ignore = self.G.source_sink_edges.union(edges_to_ignore_internal)
    # Nodes/edges are required to be covered, so all expanded-graph edges are safe to trust.
    self.trusted_edges_for_safety = self.G_internal.edges()

    if weight_type not in [int, float]:
        utils.logger.error(f"{__name__}: weight_type must be either int or float, not {weight_type}")
        raise ValueError(f"weight_type must be either int or float, not {weight_type}")
    self.weight_type = weight_type

    self.k = k
    self.original_k = k
    self.optimization_options = optimization_options or {}
    self.discordance_tolerance = discordance_tolerance

    self.flow_attr = flow_attr
    self.w_max = self.k * self.weight_type(
        self.G.get_max_flow_value_and_check_non_negative_flow(
            flow_attr=self.flow_attr,
            edges_to_ignore=self.edges_to_ignore,
        )
    )

    self.pi_vars = {}
    self.path_weights_vars = {}
    self.discordant_edge_vars = {}
    self.discordant_low_vars = {}
    self.discordant_high_vars = {}

    self.path_weights_sol = None
    self.discordant_edges_sol = None
    self.discordant_nodes_sol = None
    self._solution = None
    self._lowerbound_k = None

    self.solve_statistics = {}
    self.solve_time_start = time.perf_counter()

    self.optimization_options["trusted_edges_for_safety"] = set(self.trusted_edges_for_safety)

    super().__init__(
        G=self.G,
        k=self.k,
        max_edge_repetition_dict=self.G.compute_edge_max_reachable_value(flow_attr=self.flow_attr),
        subset_constraints=self.subset_constraints,
        subset_constraints_coverage=1.0,
        optimization_options=self.optimization_options,
        solver_options=solver_options,
        solve_statistics=self.solve_statistics,
    )

    # Called from the walk-model parent class.
    self.create_solver_and_walks()

    # Called from this class to encode objective and extra constraints.
    self._encode_discordance_decomposition()
    self._encode_cover_every_node_edge_constraints()
    self._encode_objective()

    utils.logger.info(f"{__name__}: END initialized with graph id = {utils.fpid(G)}, k = {self.k}")

get_solution

get_solution(
    remove_empty_walks=True,
)

Retrieves the solution for the discordance minimization problem.

Source code in flowpaths/kmindiscordantnodescycles.py
def get_solution(self, remove_empty_walks=True):
    """
    Retrieves the solution for the discordance minimization problem.
    """

    if self._solution is not None:
        return self._remove_empty_walks(self._solution) if remove_empty_walks else self._solution

    self.check_is_solved()

    weights_sol_dict = self.solver.get_values(self.path_weights_vars)

    self.path_weights_sol = [
        round(weights_sol_dict[i]) if self.weight_type == int else float(weights_sol_dict[i])
        for i in range(self.k)
    ]

    self.discordant_edges_sol = self.solver.get_values(self.discordant_edge_vars)
    for (u, v) in self.edge_indexes_basic:
        self.discordant_edges_sol[(u, v)] = int(round(self.discordant_edges_sol[(u, v)]))

    self.discordant_nodes_sol = {}
    for expanded_edge, z_value in self.discordant_edges_sol.items():
        condensed_node = self.G_internal.get_condensed_node(expanded_edge)
        if condensed_node is not None:
            self.discordant_nodes_sol[condensed_node] = z_value
        else:
            utils.logger.error(
                f"{__name__}: Error: expanded edge {expanded_edge} does not correspond to any condensed node. This should not happen."
            )

    self._solution = {
        "_walks_internal": self.get_solution_walks(),
        "walks": self.G_internal.get_condensed_paths(self.get_solution_walks()),
        "weights": self.path_weights_sol,
        "discordant_nodes": self.discordant_nodes_sol,
    }

    return self._remove_empty_walks(self._solution) if remove_empty_walks else self._solution

is_valid_solution

is_valid_solution(
    tolerance=0.001,
)

Checks if the solution is valid by comparing walk-induced values and discordance labels.

Source code in flowpaths/kmindiscordantnodescycles.py
def is_valid_solution(self, tolerance=0.001):
    """
    Checks if the solution is valid by comparing walk-induced values and discordance labels.
    """

    if self._solution is None:
        self.get_solution()

    solution_walks = self._solution.get("_walks_internal", self._solution["walks"])
    solution_weights = self._solution["weights"]

    if self.discordant_edges_sol is None:
        self.discordant_edges_sol = self.solver.get_values(self.discordant_edge_vars)
        for (u, v) in self.edge_indexes_basic:
            self.discordant_edges_sol[(u, v)] = int(round(self.discordant_edges_sol[(u, v)]))

    solution_discordance_expanded = self.discordant_edges_sol
    solution_walks_of_edges = [
        [(walk[i], walk[i + 1]) for i in range(len(walk) - 1)]
        for walk in solution_walks
    ]

    weight_from_walks = {(u, v): 0 for (u, v) in self.G.edges()}
    for weight, walk in zip(solution_weights, solution_walks_of_edges):
        for e in walk:
            weight_from_walks[e] += weight

    for u, v, data in self.G.edges(data=True):
        if self.flow_attr in data and (u, v) not in self.edges_to_ignore:
            interval_lb = (1 - self.discordance_tolerance) * data[self.flow_attr]
            interval_ub = (1 + self.discordance_tolerance) * data[self.flow_attr]
            inside_interval = interval_lb - tolerance <= weight_from_walks[(u, v)] <= interval_ub + tolerance
            edge_z = int(round(solution_discordance_expanded[(u, v)]))

            if edge_z == 0 and not inside_interval:
                utils.logger.debug(
                    f"{__name__}: Invalid solution for expanded-graph node item ({u}, {v}): z=0 but "
                    f"weight from walks {weight_from_walks[(u, v)]} is outside "
                    f"[{interval_lb}, {interval_ub}]"
                )
                return False
            if edge_z == 1 and inside_interval:
                utils.logger.debug(
                    f"{__name__}: Invalid solution for expanded-graph node item ({u}, {v}): z=1 but "
                    f"weight from walks {weight_from_walks[(u, v)]} is inside "
                    f"[{interval_lb}, {interval_ub}]"
                )
                return False

    if abs(self.get_objective_value() - self.solver.get_objective_value()) > tolerance * self.original_k:
        utils.logger.debug(
            f"{__name__}: Invalid solution: objective value {self.get_objective_value()} != solver objective value {self.solver.get_objective_value()} (tolerance: {tolerance * self.original_k})"
        )
        return False

    return True