Skip to content

Minimum-Paths Minimum Discordant Nodes

This model optimizes the number of paths for the k-Minimum Discordant Nodes objective in DAGs. It wraps NumPathsOptimization with:

  • model_type = kMinDiscordantNodes
  • stop_on_delta_abs = 0

Thus, the routine iterates over increasing values of \(k\) and stops when the objective value no longer improves in absolute value (plateau criterion).

1. Definition

Given a DAG with node flow values, this model returns a decomposition minimizing the number of discordant nodes while selecting \(k\) automatically.

The search starts at $$ k_0 = \max\left(\text{min_num_paths},\ \text{lowerbound}(k)\right) $$ and increases \(k\) until one of the stopping conditions in NumPathsOptimization is met.

2. Solving the problem

import flowpaths as fp
import networkx as nx

G = nx.DiGraph()
G.add_node("s", flow=5)
G.add_node("a", flow=10)
G.add_node("t", flow=5)
G.add_edge("s", "a")
G.add_edge("a", "t")

model = fp.MinPathsMinDiscordantNodes(
    G=G,
    flow_attr="flow",
    discordance_tolerance=0.1,
    min_num_paths=1,
    max_num_paths=10,
)
model.solve()

if model.is_solved():
    sol = model.get_solution(remove_empty_paths=True)
    print(model.model.k)  # selected k
    print(sol["paths"])
    print(sol["weights"])
    print(sol["discordant_nodes"])

Example search flow:

flowchart LR
    start([Start at k0]) --> solvek[Solve k-MinDiscordantNodes at k]
    solvek --> feasible{Solved?}
    feasible -- no --> nextk[Increase k]
    feasible -- yes --> plateau{Objective plateau?}
    plateau -- no --> nextk
    plateau -- yes --> stop([Return previous feasible model])
    nextk --> solvek

3. References

  1. Jin Zhao, Haodi Feng, Daming Zhu, Yu Lin, MultiTrans: An Algorithm for Path Extraction Through Mixed Integer Linear Programming for Transcriptome Assembly, IEEE/ACM Transactions on Computational Biology and Bioinformatics 19(1), 48-56, 2022.

  2. See also flowpaths References.

MinPathsMinDiscordantNodes

MinPathsMinDiscordantNodes(
    G: DiGraph,
    flow_attr: str,
    weight_type: type = float,
    discordance_tolerance: float = 0.1,
    subsequence_constraints: list = None,
    additional_starts: list = None,
    additional_ends: list = None,
    optimization_options: dict = None,
    solver_options: dict = None,
    round_flow_values_to_int: bool = True,
    flow_values_divisor: float = 1,
    min_num_paths: int = None,
    max_num_paths: int = 2
    ** 64,
    time_limit: float = float(
        "inf"
    ),
)

Bases: NumPathsOptimization

Minimize the number of paths for k-MinDiscordantNodes on DAG-like inputs.

The class wraps :class:NumPathsOptimization with: - model_type fixed to kMinDiscordantNodes - stop_on_delta_abs fixed to 0

This means the search over k stops at the first plateau, i.e. when the objective no longer improves in absolute value.

Multi-component behavior

If G has multiple weakly connected components, this class solves one component-local instance per component and merges the solutions. Constraints and optional start/end nodes are filtered per component. A single global time_limit is enforced across all component solves.

Build a MinPathsMinDiscordantNodes optimizer.

Parameters

G : nx.DiGraph Input graph. flow_attr : str Node/edge attribute name used by the wrapped discordance model. weight_type : type, default=float Type for path weights (typically int or float). discordance_tolerance : float, default=0.1 Relative tolerance used when deciding if a node is discordant. subsequence_constraints : list, optional List of node sequences that must be covered. In multi-component mode, each sequence must lie entirely inside one component. additional_starts : list, optional Optional allowed start nodes. Filtered per component in multi-component mode. additional_ends : list, optional Optional allowed end nodes. Filtered per component in multi-component mode. optimization_options : dict, optional Options forwarded to underlying models. solver_options : dict, optional Solver backend options (HiGHS/Gurobi wrapper options). flow_values_divisor : float, default=1 Divides node/edge flow values before solving. Division happens before optional rounding. min_num_paths : int, optional Lower bound for k. If None, a lower bound is computed via MinPathCover on a node-expanded graph (per component when componentized). max_num_paths : int, default=2**64 Upper bound for k. time_limit : float, default=inf Global time limit in seconds. For multi-component graphs, each component receives the remaining budget.

Source code in flowpaths/minpathsmindiscordantnodes.py
def __init__(
    self,
    G: nx.DiGraph,
    flow_attr: str,
    weight_type: type = float,
    discordance_tolerance: float = 0.1,
    subsequence_constraints: list = None,
    additional_starts: list = None,
    additional_ends: list = None,
    optimization_options: dict = None,
    solver_options: dict = None,
    round_flow_values_to_int: bool = True,
    flow_values_divisor: float = 1,
    min_num_paths: int = None,
    max_num_paths: int = 2**64,
    time_limit: float = float("inf"),
):
    """
    Build a MinPathsMinDiscordantNodes optimizer.

    Parameters
    ----------
    G : nx.DiGraph
        Input graph.
    flow_attr : str
        Node/edge attribute name used by the wrapped discordance model.
    weight_type : type, default=float
        Type for path weights (typically ``int`` or ``float``).
    discordance_tolerance : float, default=0.1
        Relative tolerance used when deciding if a node is discordant.
    subsequence_constraints : list, optional
        List of node sequences that must be covered. In multi-component
        mode, each sequence must lie entirely inside one component.
    additional_starts : list, optional
        Optional allowed start nodes. Filtered per component in
        multi-component mode.
    additional_ends : list, optional
        Optional allowed end nodes. Filtered per component in
        multi-component mode.
    optimization_options : dict, optional
        Options forwarded to underlying models.
    solver_options : dict, optional
        Solver backend options (HiGHS/Gurobi wrapper options).
    flow_values_divisor : float, default=1
        Divides node/edge flow values before solving. Division happens
        before optional rounding.
    min_num_paths : int, optional
        Lower bound for ``k``. If ``None``, a lower bound is computed via
        ``MinPathCover`` on a node-expanded graph (per component when
        componentized).
    max_num_paths : int, default=2**64
        Upper bound for ``k``.
    time_limit : float, default=inf
        Global time limit in seconds. For multi-component graphs, each
        component receives the remaining budget.
    """
    self._is_componentized = False
    self._component_graphs = []
    self._component_labels = []
    self._component_models = []
    self._component_solutions = None
    self._component_objective_value = None
    self._logged_cross_component_constraints = set()
    self._constraint_reachability_infeasible = False
    self._path_cover_seed_info = {}
    self.weight_type = weight_type
    self._round_flow_values_to_int = round_flow_values_to_int
    self._flow_values_divisor = self._validate_flow_values_divisor(flow_values_divisor)
    self._original_graph = G.copy()

    self._model_init_kwargs = {
        "flow_attr": flow_attr,
        "weight_type": weight_type,
        "discordance_tolerance": discordance_tolerance,
        "subsequence_constraints": subsequence_constraints or [],
        "additional_starts": additional_starts or [],
        "additional_ends": additional_ends or [],
        "optimization_options": optimization_options,
        "solver_options": solver_options or {},
        "round_flow_values_to_int": round_flow_values_to_int,
        "flow_values_divisor": self._flow_values_divisor,
        "min_num_paths": min_num_paths,
        "max_num_paths": max_num_paths,
        "time_limit": time_limit,
    }

    G = self._get_preprocessed_graph_with_scaled_flows(
        G=G,
        flow_attr=flow_attr,
        flow_values_divisor=self._flow_values_divisor,
        round_flow_values_to_int=round_flow_values_to_int,
    )

    if not self._validate_subsequence_constraint_reachability(
        graph=G,
        constraints=self._model_init_kwargs["subsequence_constraints"],
    ):
        self._constraint_reachability_infeasible = True
        return

    weak_components = list(nx.weakly_connected_components(G))
    if len(weak_components) > 1:
        self._is_componentized = True
        utils.logger.info(
            f"{__name__}: Detected {len(weak_components)} weakly connected components; solving them independently.",
        )
        for component_index, nodes in enumerate(weak_components, start=1):
            component_graph = G.subgraph(nodes).copy()
            component_name = self._set_component_graph_component_suffix(
                component_graph=component_graph,
                parent_graph=G,
                component_index=component_index,
                total_components=len(weak_components),
            )
            self._component_graphs.append(component_graph)
            self._component_labels.append(component_name)
            utils.logger.info(
                f"{__name__}: Prepared component {component_index}/{len(weak_components)} as '{component_name}' with {component_graph.number_of_nodes()} nodes and {component_graph.number_of_edges()} edges.",
            )
        super().__init__(
            model_type=kmindiscordantnodes.kMinDiscordantNodes,
            stop_on_delta_abs=0,
            min_num_paths=(1 if min_num_paths is None else min_num_paths),
            max_num_paths=max_num_paths,
            time_limit=time_limit,
            G=G,
            flow_attr=flow_attr,
            weight_type=weight_type,
            discordance_tolerance=discordance_tolerance,
            subsequence_constraints=subsequence_constraints or [],
            additional_starts=additional_starts or [],
            additional_ends=additional_ends or [],
            optimization_options=optimization_options,
            solver_options=solver_options or {},
        )
        return

    if min_num_paths is None:
        min_num_paths_lb, seed_paths = self._compute_min_path_cover_lower_bound_and_seed(
            G=G,
            subsequence_constraints=subsequence_constraints or [],
            additional_starts=additional_starts or [],
            additional_ends=additional_ends or [],
            optimization_options=optimization_options or {},
            solver_options=solver_options or {},
        )
    else:
        min_num_paths_lb = min_num_paths
        seed_paths = None

    merged_optimization_options = self._get_optimization_options_with_path_cover_seed(
        optimization_options=optimization_options,
        seed_paths=seed_paths,
    )
    self._model_init_kwargs["optimization_options"] = merged_optimization_options

    super().__init__(
        model_type=kmindiscordantnodes.kMinDiscordantNodes,
        stop_on_delta_abs=0,
        min_num_paths=max(min_num_paths_lb, 1),
        max_num_paths=max_num_paths,
        time_limit=time_limit,
        G=G,
        flow_attr=flow_attr,
        weight_type=weight_type,
        discordance_tolerance=discordance_tolerance,
        subsequence_constraints=subsequence_constraints or [],
        additional_starts=additional_starts or [],
        additional_ends=additional_ends or [],
        optimization_options=merged_optimization_options,
        solver_options=solver_options or {},
    )

get_lowerbound_k

get_lowerbound_k()

Return lower bound used for the k search.

Source code in flowpaths/minpathsmindiscordantnodes.py
def get_lowerbound_k(self):
    """Return lower bound used for the ``k`` search."""
    if not self._is_componentized:
        return super().get_lowerbound_k()

    return sum(component_graph.number_of_nodes() > 0 for component_graph in self._component_graphs)

get_objective_value

get_objective_value()

Return discordant-node count for the postprocessed solution.

Source code in flowpaths/minpathsmindiscordantnodes.py
def get_objective_value(self):
    """Return discordant-node count for the postprocessed solution."""
    self.check_is_solved()
    solution = self.get_solution(remove_empty_paths=False)
    return sum(solution.get("discordant_nodes", {}).values())

get_solution

get_solution(
    remove_empty_paths=True,
)

Return merged solution, optionally removing empty paths.

Source code in flowpaths/minpathsmindiscordantnodes.py
def get_solution(self, remove_empty_paths=True):
    """Return merged solution, optionally removing empty paths."""
    if not self._is_componentized:
        raw_solution = super().get_solution(remove_empty_paths=False)
        self._solution = self._postprocess_solution_for_original_graph(raw_solution)
        return self._remove_empty_sequences(self._solution) if remove_empty_paths else self._solution

    self.check_is_solved()
    if self._solution is None:
        self._solution = self._merge_component_solutions(self._component_solutions or [])

    self._solution = self._postprocess_solution_for_original_graph(self._solution)

    return self._remove_empty_sequences(self._solution) if remove_empty_paths else self._solution

is_valid_solution

is_valid_solution() -> bool

Validate transformed solution against original graph flow values.

Source code in flowpaths/minpathsmindiscordantnodes.py
def is_valid_solution(self) -> bool:
    """Validate transformed solution against original graph flow values."""
    self.check_is_solved()
    solution = self.get_solution(remove_empty_paths=False)
    if solution is None:
        return False

    sequence_key = "paths" if "paths" in solution else "walks"
    sequences = solution.get(sequence_key, [])
    weights = solution.get("weights", [])
    if len(sequences) != len(weights):
        return False

    if self._is_componentized:
        return all(component_model.is_valid_solution() for component_model in self._component_models) and self._is_solution_discordance_consistent(solution)

    return self._is_solution_discordance_consistent(solution)

solve

solve() -> bool

Solve the model.

For a single-component graph, delegates to NumPathsOptimization. For multiple weakly connected components, solves components one by one, consuming a shared global time budget and summing objective values.

Source code in flowpaths/minpathsmindiscordantnodes.py
def solve(self) -> bool:
    """
    Solve the model.

    For a single-component graph, delegates to ``NumPathsOptimization``.
    For multiple weakly connected components, solves components one by one,
    consuming a shared global time budget and summing objective values.
    """
    if self._constraint_reachability_infeasible:
        utils.logger.critical(
            f"{__name__}: Aborting solve because subsequence constraints are infeasible (reachability pre-check failed).",
        )
        self.solve_statistics = {
            "solve_status": numpathsoptimization.NumPathsOptimization.infeasible_status_name,
            "solve_time": 0,
        }
        self._is_solved = False
        return False

    if not self._is_componentized:
        graph = self.kwargs.get("G", self._original_graph)
        if self._solve_trivial_single_node_graph(graph):
            return True

        solved = super().solve()
        self._merge_path_cover_seed_info_into_statistics()
        if not solved:
            self._log_single_component_failure()
        return solved

    self.solve_time_start = time.perf_counter()
    self._component_models = []
    self._component_solutions = []
    self._component_objective_value = 0

    utils.logger.info(
        f"{__name__}: Starting multi-component solve for {len(self._component_graphs)} components with global time_limit={self.time_limit}.",
    )

    for component_index, component_graph in enumerate(self._component_graphs, start=1):
        component_name = self._get_graph_name(component_graph)
        remaining_time = self.time_limit - self.solve_time_elapsed
        if remaining_time <= 0:
            utils.logger.warning(
                f"{__name__}: Time budget exhausted before component {component_index}/{len(self._component_graphs)} ('{component_name}').",
            )
            self.solve_statistics = {
                "solve_status": numpathsoptimization.NumPathsOptimization.timeout_status_name,
                "solve_time": self.solve_time_elapsed,
            }
            self._is_solved = False
            return False

        utils.logger.info(
            f"{__name__}: Solving component {component_index}/{len(self._component_graphs)} ('{component_name}') with remaining_time={remaining_time:.3f}s.",
        )

        trivial_solution = self._get_trivial_zero_flow_component_solution(component_graph)
        if trivial_solution is not None:
            utils.logger.info(
                f"{__name__}: Component '{component_name}' solved via trivial isolated-node shortcut.",
            )
            self._component_solutions.append(trivial_solution)
            continue

        component_model = self._build_component_model(component_graph, remaining_time)
        component_model.solve()
        self._component_models.append(component_model)

        if not component_model.is_solved():
            component_status = None
            if isinstance(component_model.solve_statistics, dict):
                component_status = component_model.solve_statistics.get("solve_status")
            if component_status == numpathsoptimization.NumPathsOptimization.timeout_status_name:
                witness_count = 0
                if hasattr(component_model, "_path_cover_seed_info"):
                    witness_count = len(component_model._path_cover_seed_info.get("seed_paths", []))
                utils.logger.warning(
                    f"{__name__}: Component '{component_name}' timed out while solving the weighted discordance model. Structural path-cover witness paths: {witness_count}.",
                )
            else:
                utils.logger.error(
                    f"{__name__}: Component '{component_name}' failed with solve_status={component_status}.",
                )
            self.solve_statistics = {
                "solve_status": component_status or numpathsoptimization.NumPathsOptimization.infeasible_status_name,
                "solve_time": self.solve_time_elapsed,
                "failed_component_name": component_name,
                "failed_component_index": component_index,
                "failed_component_status": component_status,
            }
            self._is_solved = False
            return False

        component_solution = component_model.get_solution(remove_empty_paths=False)
        self._component_solutions.append(component_solution)
        self._component_objective_value += component_model.get_objective_value()
        utils.logger.info(
            f"{__name__}: Component '{component_name}' solved. Partial global objective={self._component_objective_value}.",
        )

    self._solution = self._merge_component_solutions(self._component_solutions)
    self.solve_statistics = {
        "solve_status": numpathsoptimization.NumPathsOptimization.solved_status_name,
        "solve_time": self.solve_time_elapsed,
    }
    utils.logger.info(
        f"{__name__}: Multi-component solve completed. Total objective={self._component_objective_value}, solve_time={self.solve_time_elapsed:.3f}s.",
    )
    self.set_solved()
    return True