Skip to content

Abstract Path Model in DAGs

A general approach in developing a model to decompose a weighted graph into weighted paths is to:

  1. Fix \(k\), the number of paths.
  2. Formulate in ILP the \(k\) paths; that is adding a set of \(k\) variables and suitable constraints constraints such that the \(i\)-th set of variables encodes the \(i\)-th path.
  3. Add additional variables, constraints, or set the objective function, that these paths must satisfy.
  4. Iterate the above process for different values of \(k\), until the “best” one is found (“best” depends on the problem). See our implementation of a basic routine for this step.

For step 2. above we provide the abstract class AbstractPathModelDAG which models a given number \(k\) of paths in a given acyclic graph \(G = (V,E)\). For simplicity, \(G\) must have a single source \(s\) and a single sink \(t\), see our class stDiGraph. (The stDiGraph class adds these automatically for any NetworkX DiGraph, and keeps track of their incident edges.) This approach appeared in this paper.

More in detail, for every edge \((u,v) \in E\), and for every path index \(i \in \{0,\dots,k-1\}\), we add a binary variable \(x_{u,v,i} \in \{0,1\}\). We add constraints on these variables to ensure that for every \(i\) the variables \(x_{u,v,i}\) that equal 1 induce an \(s\)-\(t\) path (i.e., a path from \(s\) to \(t\)). In other words \(x_{u,v,i} = 1\) if edge \((u,v)\) belongs to solution path \(i\), and 0 otherwise. See the paper on the specific constraints that are added to enforce that they induce an \(s\)-\(t\) path.

For example, the edges in brown below induce an \(s\)-\(t\) path (for say \(i = 3\)), and notice that the \(x_{u,v,3}\) variables equal 1 only on the edges \((u,v)\) on the path.

%%{init: {'themeVariables': { 'edgeLabelBackground': 'white'}}}%%
flowchart LR
    s((s))
    a((a))
    b((b))
    c((c))
    d((d))
    t((t))
    s -->|"$$x_{s,a,3} = 1$$"| a
    a -->|"$$x_{a,b,3} = 1$$"| b
    s -->|"$$x_{s,b,3} = 0$$"| b
    a -->|"$$x_{a,c,3} = 0$$"| c
    b -->|"$$x_{b,c,3} = 1$$"| c
    c -->|"$$x_{c,d,3} = 1$$"| d
    d -->|"$$x_{d,t,3} = 1$$"| t
    c -->|"$$x_{c,t,3} = 0$$"| t

    linkStyle 0,1,4,5,6 stroke:brown,stroke-width:3;

The search for paths

  • Note that we do not have the paths beforehand, and the ILP will search for paths (i.e. assignment of values to the \(x_{u,v,i}\) variables, under the constraints that they induce a path).
  • Once a class inherits from AbstractPathModelDAG, it will add other variables and constraints (as in point 3. above). The ILP solver will then search for the \(k\) paths (i.e. find the values to the \(x_{u,v,i}\) variables) to satisfy all constraints.

Example: Modelling \(k\)-Flow Decomposition

Consider the problem of decomposing a network flow \(f : E \rightarrow \mathbb{N}\) over a DAG \(G = (V,E)\) into a given number \(k\) of \(s\)-\(t\) paths (k-Flow Decomposition). Assume we created the \(x_{u,v,i}\) variables as above. Thus, we just need to implement point 3. above.

  • We introduce a variable \(w_i\) (integer or continuous) modeling the weight of path \(i\), for every \(i \in \{0,\dots,k-1\}\).
  • We need to enforce the “flow explanation” constraint: $$ f(u,v) = \sum_{i=0}^{k-1} x_i \cdot w_i, ~~\forall (u,v) \in E. $$ Note that in the above \(f(u,v)\) is a constant. Moreover, \(x_i \cdot w_i\) is not a linear term (as required by an Integer Linear Program), but it can be easily linearized via additional variables and constraints. However, our SolverWrapper class provides the method add_binary_continuous_product_constraint() to directly encode such a non-linear constraint, without bothering to manually set up these additional variables and constraints.

The \(x_{u,v,i}\) variables are implemented as edge_vars[(u, v, i)], see the class documentation below.

AbstractWalkModelDiGraph

AbstractWalkModelDiGraph(
    G: stDiGraph,
    k: int,
    max_edge_repetition: int = 1,
    max_edge_repetition_dict: dict = None,
    subset_constraints: list = [],
    subset_constraints_coverage: float = 1,
    optimization_options: dict = None,
    solver_options: dict = {},
    solve_statistics: dict = {},
)

Bases: ABC

Parameters

  • G: stdigraph.stDiGraph

    The directed graph to be used, possibly with cycles. Create it using the stDiGraph class.

  • k: int

    The number of s-t walks to be modeled.

  • max_edge_repetition: int, optional

    The maximum number of times an edge can be used in a walk. Defaults to 1.

  • max_edge_repetition_dict: dict, optional

    A per-edge upper bound mapping that overrides `max_edge_repetition` on a per-edge basis.
    Keys are edges `(u, v)` from `G.edges()` and values are non-negative integers specifying
    the maximum number of times that edge can be used within a single walk (layer).
    
    Requirements and behavior:
    - If provided, it must contain an entry for every edge in `G` (missing entries raise `ValueError`).
    - Values should be integers ≥ 0; a value of 0 forbids the edge in any walk.
    - When both `max_edge_repetition` and this dict are provided, the dict takes precedence per edge.
    - These bounds are applied per layer i and are used to set the variable upper bounds and related
        big-M constants in the model.
    
  • subset_constraints: list, optional

    A list of lists, where each list is a set of edges (not necessarily contiguous). Defaults to an empty list.

    Each set of edges must appear in at least one solution path; if you also pass subset_constraints_coverage, then each set of edges must appear in at least subset_constraints_coverage fraction of some solution walk, see below.

  • subset_constraints_coverage: float, optional

    Coverage fraction of the subset constraints that must be covered by some solution walk, in terms of number of edges. - Defaults to 1 (meaning that 100% of the edges of the constraint need to be covered by some solution walk).

  • optimization_options: dict, optional

    Dictionary of optimization options. Defaults to None, in which case the default values are used. See the available optimizations. If you pass any safety optimizations, you must also pass the dict entry "trusted_edges_for_safety" (see below). If a child class has already solved the problem and has the solution paths, it can pass them via the dict entry "external_solution_paths" to skip the solver creation and encoding of paths (see below).

    • "trusted_edges_for_safety": set

      Set of trusted edges for safety. Defaults to None.

      Global optimality

      In order for the optimizations to still guarantee a global optimum, you must guarantee that:

      1. The solution is made up of source-to-sink walks, and
      2. Every edge in trusted_edges_for_safety appears in some solution walk, for all solutions. This naturally holds for several problems, for example Minimum Flow Decomposition or k-Minimum Path Error where in fact, under default settings, all edges appear in all solutions.
  • solver_options: dict, optional

    Dictionary of solver options. Defaults to {}, in which case the default values are used. See the available solver options.

  • solve_statistics: dict, optional

    Dictionary to store solve statistics. Defaults to {}.

Raises

  • ValueError: If trusted_edges_for_safety is not provided when optimizing with optimize_with_safe_sequences.
  • ValueError: If max_edge_repetition_dict is provided but is missing any edge in G.
Source code in flowpaths/abstractwalkmodeldigraph.py
def __init__(
    self,
    G: stdigraph.stDiGraph,
    k: int,
    max_edge_repetition: int = 1,
    max_edge_repetition_dict: dict = None,
    subset_constraints: list = [],
    subset_constraints_coverage: float = 1,
    optimization_options: dict = None,
    solver_options: dict = {},
    solve_statistics: dict = {},
):
    """
    Parameters
    ----------

    - `G: stdigraph.stDiGraph`  

        The directed graph to be used, possibly with cycles. Create it using the [`stDiGraph` class](stdigraph.md).

    - `k: int`

        The number of s-t walks to be modeled.

    - `max_edge_repetition: int`, optional

        The maximum number of times an edge can be used in a walk. Defaults to 1.

    - `max_edge_repetition_dict: dict`, optional

            A per-edge upper bound mapping that overrides `max_edge_repetition` on a per-edge basis.
            Keys are edges `(u, v)` from `G.edges()` and values are non-negative integers specifying
            the maximum number of times that edge can be used within a single walk (layer).

            Requirements and behavior:
            - If provided, it must contain an entry for every edge in `G` (missing entries raise `ValueError`).
            - Values should be integers ≥ 0; a value of 0 forbids the edge in any walk.
            - When both `max_edge_repetition` and this dict are provided, the dict takes precedence per edge.
            - These bounds are applied per layer i and are used to set the variable upper bounds and related
                big-M constants in the model.

    - `subset_constraints: list`, optional

        A list of lists, where each list is a *set* of edges (not necessarily contiguous). Defaults to an empty list.

        Each set of edges must appear in at least one solution path; if you also pass `subset_constraints_coverage`, 
        then each set of edges must appear in at least `subset_constraints_coverage` fraction of some solution walk, see below.

    - `subset_constraints_coverage: float`, optional

        Coverage fraction of the subset constraints that must be covered by some solution walk, in terms of number of edges. 
            - Defaults to 1 (meaning that 100% of the edges of the constraint need to be covered by some solution walk).

    - `optimization_options: dict`, optional 

        Dictionary of optimization options. Defaults to `None`, in which case the default values are used. See the [available optimizations](solver-options-optimizations.md). 
        If you pass any safety optimizations, you must also pass the dict entry `"trusted_edges_for_safety"` (see below). 
        If a child class has already solved the problem and has the solution paths, it can pass them via the dict entry `"external_solution_paths"` to skip the solver creation and encoding of paths (see below).

        - `"trusted_edges_for_safety": set`

            Set of trusted edges for safety. Defaults to `None`.

            !!! warning "Global optimality"
                In order for the optimizations to still guarantee a global optimum, you must guarantee that:

                1. The solution is made up of source-to-sink walks, and
                2. Every edge in `trusted_edges_for_safety` appears in some solution walk, for all solutions. This naturally holds for several problems, for example [Minimum Flow Decomposition](minimum-flow-decomposition-cycles.md) or [k-Minimum Path Error](k-min-path-error-cycles.md) where in fact, under default settings, **all** edges appear in all solutions.

    - `solver_options: dict`, optional

        Dictionary of solver options. Defaults to `{}`, in which case the default values are used. 
        See the [available solver options](solver-options-optimizations.md).

    - `solve_statistics: dict`, optional

        Dictionary to store solve statistics. Defaults to `{}`.


    Raises
    ----------

    - ValueError: If `trusted_edges_for_safety` is not provided when optimizing with `optimize_with_safe_sequences`.
    - ValueError: If `max_edge_repetition_dict` is provided but is missing any edge in `G`.
    """

    self.G = G
    if G.number_of_edges() == 0:
        utils.logger.error(f"{__name__}: The input graph G has no edges. Please provide a graph with at least one edge.")
        raise ValueError(f"The input graph G has no edges. Please provide a graph with at least one edge.")
    self.id = self.G.id
    self.k = k
    if k <= 0:
        utils.logger.error(f"{__name__}: k must be positive, got {k}.")
        raise ValueError(f"k must be positive, got {k}.")
    if max_edge_repetition_dict is None:
        self.edge_upper_bounds = {edge: max_edge_repetition for edge in self.G.edges()}
    else:
        self.edge_upper_bounds = max_edge_repetition_dict
        for edge in self.G.edges():
            if edge not in self.edge_upper_bounds:
                utils.logger.critical(f"{__name__}: Missing max_edge_repetition in max_edge_repetition_dict for edge {edge}")
                raise ValueError(f"Missing max_edge_repetition for edge {edge}")
    # We set to 1 in edge_upper_bounds if the edge is not inside an SCC of self.G,
    # because these edges cannot be traversed more than 1 times by any walk
    for edge in self.G.edges():
        if not self.G.is_scc_edge(edge[0], edge[1]):
            self.edge_upper_bounds[edge] = 1

    self.subset_constraints = copy.deepcopy(subset_constraints)
    if self.subset_constraints is not None:
        self._check_valid_subset_constraints()

    self.subset_constraints_coverage = subset_constraints_coverage
    if len(subset_constraints) > 0:
        if self.subset_constraints_coverage <= 0 or self.subset_constraints_coverage > 1:
            utils.logger.error(f"{__name__}: subset_constraints_coverage must be in the range (0, 1]")
            raise ValueError("subset_constraints_coverage must be in the range (0, 1]")

    self.solve_statistics = solve_statistics
    self.edge_vars = {}
    self.edge_vars_sol = {}
    self.subset_vars = {}

    self.solver_options = solver_options
    if self.solver_options is None:
        self.solver_options = {}
    self.threads = self.solver_options.get("threads", sw.SolverWrapper.threads)

    # optimizations
    if optimization_options is None:
        optimization_options = {}
    self.optimize_with_safe_sequences = optimization_options.get("optimize_with_safe_sequences", AbstractWalkModelDiGraph.optimize_with_safe_sequences)
    self.trusted_edges_for_safety = optimization_options.get("trusted_edges_for_safety", None)
    self.allow_empty_walks = optimization_options.get("allow_empty_walks", AbstractWalkModelDiGraph.allow_empty_walks)
    self.optimize_with_safety_as_subset_constraints = optimization_options.get("optimize_with_safety_as_subset_constraints", AbstractWalkModelDiGraph.optimize_with_safety_as_subset_constraints)
    self.optimize_with_max_safe_antichain_as_subset_constraints = optimization_options.get("optimize_with_max_safe_antichain_as_subset_constraints", AbstractWalkModelDiGraph.optimize_with_max_safe_antichain_as_subset_constraints)
    self.optimize_with_safe_sequences_allow_geq_constraints = optimization_options.get("optimize_with_safe_sequences_allow_geq_constraints", AbstractWalkModelDiGraph.optimize_with_safe_sequences_allow_geq_constraints)
    self.optimize_with_safe_sequences_fix_via_bounds = optimization_options.get("optimize_with_safe_sequences_fix_via_bounds", AbstractWalkModelDiGraph.optimize_with_safe_sequences_fix_via_bounds)
    self.optimize_with_safe_sequences_fix_zero_edges = optimization_options.get(
        "optimize_with_safe_sequences_fix_zero_edges",
        AbstractWalkModelDiGraph.optimize_with_safe_sequences_fix_zero_edges,
    )

    self._is_solved = False

    if not hasattr(self, "solve_time_start") or self.solve_time_start is None:
        self.solve_time_start = time.perf_counter()

compute_edge_max_reachable_value

compute_edge_max_reachable_value(
    u, v
) -> int

Returns an upper bound on how many times edge (u,v) can be used in a single walk layer.

Subclasses should override this to provide a tighter, problem-specific bound. The default falls back to self.max_edge_repetition. Must return a non-negative integer.

Source code in flowpaths/abstractwalkmodeldigraph.py
def compute_edge_max_reachable_value(self, u, v) -> int:
    """
    Returns an upper bound on how many times edge (u,v) can be used in a single walk layer.

    Subclasses should override this to provide a tighter, problem-specific bound.
    The default falls back to `self.max_edge_repetition`.
    Must return a non-negative integer.
    """
    return int(self.max_edge_repetition)

create_solver_and_walks

create_solver_and_walks()

Creates a solver instance and encodes the walks in the graph.

This method initializes the solver with the specified parameters and encodes the walks by creating variables for edges and subsets to cover.

Call this method before encoding other variables and constraints.

Always call this method before encoding other variables and constraints on the walks.

Source code in flowpaths/abstractwalkmodeldigraph.py
def create_solver_and_walks(self):
    """
    Creates a solver instance and encodes the walks in the graph.

    This method initializes the solver with the specified parameters and encodes the walks
    by creating variables for edges and subsets to cover.

    !!! warning "Call this method before encoding other variables and constraints."

        Always call this method before encoding other variables and constraints on the walks.

    """
    self.solver = sw.SolverWrapper(**self.solver_options)

    self._encode_walks()

    self._apply_safety_optimizations()

    self._encode_subset_constraints()

    self.solve_statistics["graph_width"] = self.G.get_width()
    self.solve_statistics["edge_number"] = self.G.number_of_edges()
    self.solve_statistics["node_number"] = self.G.number_of_nodes()

get_lowerbound_k abstractmethod

get_lowerbound_k()

Implement this class in the child class to return a lower bound on the number of solution paths to the model. If you have no lower bound, you should implement this method to return 1.

Source code in flowpaths/abstractwalkmodeldigraph.py
@abstractmethod
def get_lowerbound_k(self):
    """
    Implement this class in the child class to return a lower bound on the number of solution paths to the model.
    If you have no lower bound, you should implement this method to return 1.
    """
    pass

get_objective_value abstractmethod

get_objective_value()

Implement this class in the child class to return the objective value of the model. This is needed to be able to compute the safe paths (i.e. those appearing any optimum solution) for any child class.

A basic objective value is k (when we’re trying to minimize the number of paths). If your model has a different objective, you should implement this method to return the objective value of the model. If your model has no objective value, you should implement this method to return None.

Source code in flowpaths/abstractwalkmodeldigraph.py
@abstractmethod
def get_objective_value(self):
    """
    Implement this class in the child class to return the objective value of the model. This is needed to be able to
    compute the safe paths (i.e. those appearing any optimum solution) for any child class.

    A basic objective value is `k` (when we're trying to minimize the number of paths). If your model has a different
    objective, you should implement this method to return the objective value of the model. If your model has no objective value,
    you should implement this method to return None.
    """
    pass

get_solution abstractmethod

get_solution()

Implement this class in the child class to return the full solution of the model. The solution paths are obtained with the get_solution_paths method.

Source code in flowpaths/abstractwalkmodeldigraph.py
@abstractmethod
def get_solution(self):
    """
    Implement this class in the child class to return the full solution of the model.
    The solution paths are obtained with the get_solution_paths method.
    """
    pass

get_solution_walks

get_solution_walks() -> list

Retrieves the solution walks from the graph, handling cycles with multiplicities.

For each layer i, this reconstructs a single Eulerian walk that uses all edges with positive flow, ensuring complete flow decomposition.

Source code in flowpaths/abstractwalkmodeldigraph.py
def get_solution_walks(self) -> list:
    """
    Retrieves the solution walks from the graph, handling cycles with multiplicities.

    For each layer i, this reconstructs a single Eulerian walk that uses all edges
    with positive flow, ensuring complete flow decomposition.
    """

    if self.edge_vars_sol == {}:
        self.edge_vars_sol = self.solver.get_values(self.edge_vars)
    # utils.logger.debug(f"{__name__}: Getting solution walks with self.edge_vars_sol = {self.edge_vars_sol}")

    walks = []
    for i in range(self.k):
        # Build residual graph for this layer with edge multiplicities
        residual_graph = self._build_residual_graph_for_layer(i)
        # utils.logger.debug(f"{__name__}: residual_graph = {residual_graph}")

        # Check if there's any flow in this layer
        if not residual_graph:
            walks.append([])
            continue

        # Reconstruct complete Eulerian walk
        walk = self._reconstruct_eulerian_walk(residual_graph, i)
        walks.append(walk)

    return walks

is_valid_solution abstractmethod

is_valid_solution() -> bool

Implement this class in the child class to perform a basic check whether the solution is valid.

If you cannot perform such a check, provide an implementation that always returns True.

Source code in flowpaths/abstractwalkmodeldigraph.py
@abstractmethod
def is_valid_solution(self) -> bool:
    """
    Implement this class in the child class to perform a basic check whether the solution is valid.

    If you cannot perform such a check, provide an implementation that always returns True.
    """
    pass

solve

solve() -> bool

Solves the optimization model for the current instance.

Returns
  • True if the model is solved successfully, False otherwise.

The method first checks if an external solution is already provided. If so, it sets the solved attribute to True and returns True.

If not, it optimizes the model using the solver, and records the solve time and solver status in the solve_statistics dictionary. If the solver status indicates an optimal solution (either ‘kOptimal’ (highs) or status code 2 (gurobi)), it sets the solved attribute to True and returns True. Otherwise, it sets the solved attribute to False and returns False.

Source code in flowpaths/abstractwalkmodeldigraph.py
def solve(self) -> bool:
    """
    Solves the optimization model for the current instance.

    Returns
    ----------
    - True if the model is solved successfully, False otherwise.

    The method first checks if an external solution is already provided. If so, it sets the
    solved attribute to True and returns True.

    If not, it optimizes the model using the solver, and records the solve time and solver status
    in the solve_statistics dictionary. If the solver status indicates an optimal solution
    (either 'kOptimal' (highs) or status code 2 (gurobi)), it sets the solved attribute to True and returns True.
    Otherwise, it sets the solved attribute to False and returns False.
    """
    utils.logger.info(f"{__name__}: solving...")

    # self.write_model(f"model-{self.id}.lp")
    start_time = time.perf_counter()
    self.solver.optimize()
    self.solve_statistics[f"solve_time_ilp"] = time.perf_counter() - start_time
    self.solve_statistics[f"solve_time"] = time.perf_counter() - self.solve_time_start
    self.solve_statistics[f"model_status"] = self.solver.get_model_status()
    self.solve_statistics[f"number_of_nontrivial_SCCs"] = self.G.get_number_of_nontrivial_SCCs()
    self.solve_statistics[f"avg_size_of_non_trivial_SCC"] = self.G.get_avg_size_of_non_trivial_SCC()
    self.solve_statistics[f"size_of_largest_SCC"] = self.G.get_size_of_largest_SCC()

    if self.solver.get_model_status() == "kOptimal" or self.solver.get_model_status() == 2:
        self._is_solved = True
        utils.logger.info(f"{__name__}: solved successfully. Objective value: {self.get_objective_value()}")
        return True

    self._is_solved = False
    return False