Unified MILP/LP modelling convenience layer for HiGHS and Gurobi.
This class provides one API that delegates to either the
HiGHS (highspy
) or
Gurobi (gurobipy
) back-end.
Only a very small, stable subset of features needed by flowpaths is
wrapped - it is not a general purpose replacement for the native APIs.
Key capabilities
- Create variables (continuous / integer) in bulk with name prefixing
- Add linear constraints
- Add specialized modelling shortcuts:
- binary * continuous (McCormick) product constraints
- integer * continuous product constraints (bit expansion + binaries)
- piecewise constant constraints (one-hot selection)
- Build linear objectives without triggering a solve (HiGHS) or with native
semantics (Gurobi)
- Optimize with optional custom wall clock timeout (in addition to the
solver internal time limit)
- Retrieve objective, variable names, variable values (optionally enforcing
integrality for binaries), status (mapped to HiGHS style codes)
- Persist model to disk (
.lp
/ .mps
depending on backend support)
Design notes
- A minimal set of solver parameters is exposed via keyword arguments in
__init__
to keep call sites clean.
- Status codes for Gurobi are mapped onto HiGHS style names where a clear
1-to-1 mapping exists (see
gurobi_status_to_highs
).
- A secondary timeout based on POSIX signals can be enabled to guard
against situations where the native solver time limit is not obeyed
precisely. When this fires
did_timeout
is set and the reported status
becomes kTimeLimit
.
Parameters
kwargs :
Flexible configuration. Recognised keys (all optional):
- external_solver
(str): "highs"
(default) or "gurobi"
.
- threads
(int): Thread limit for solver (default: 4
).
- time_limit
(float): Internal solver time limit in seconds
(default: inf
= no limit).
- use_also_custom_timeout
(bool): If True
activate an extra
signal based timeout equal to time_limit
(default False
).
- presolve
(str): HiGHS presolve strategy (default "choose"
).
- log_to_console
(str): "true"
/ "false"
(default
"false"
) - normalized to solver specific flags.
- tolerance
(float): MIP gap, feasibility, integrality tolerance
applied uniformly (default 1e-9
; must be >= 1e-9).
- optimization_sense
(str): "minimize"
or "maximize
(default "minimize"
).
Attributes
external_solver : str
Active backend ("highs"
or "gurobi"
).
solver : object
Underlying solver object (highspy.Highs
or gurobipy.Model
).
time_limit : float
Configured internal solver time limit (seconds).
use_also_custom_timeout : bool
Whether the secondary POSIX signal timeout is enabled.
tolerance : float
Unified tolerance applied to various solver parameters.
optimization_sense : str
"minimize"
or "maximize"
as last set on the objective.
did_timeout : bool
Flag set to True
only when the custom timeout fired.
Raises
ValueError
If unsupported solver name, invalid optimization sense or an invalid
tolerance (< 1e-9) is supplied.
Source code in flowpaths/utils/solverwrapper.py
| def __init__(
self,
**kwargs
):
self.external_solver = kwargs.get("external_solver", SolverWrapper.external_solver) # Default solver
self.time_limit = kwargs.get("time_limit", SolverWrapper.time_limit)
self.use_also_custom_timeout = kwargs.get("use_also_custom_timeout", SolverWrapper.use_also_custom_timeout)
self.tolerance = kwargs.get("tolerance", SolverWrapper.tolerance) # Default tolerance value
if self.tolerance < 1e-9:
utils.logger.error(f"{__name__}: The tolerance value must be >=1e-9.")
raise ValueError("The tolerance value must be >=1e-9.")
self.optimization_sense = kwargs.get("optimization_sense", SolverWrapper.optimization_sense) # Default optimization sense
if self.optimization_sense not in ["minimize", "maximize"]:
utils.logger.error(f"{__name__}: The optimization sense must be either `minimize` or `maximize`.")
raise ValueError(f"Optimization sense {self.optimization_sense} is not supported. Only [\"minimize\", \"maximize\"] are supported.")
self.did_timeout = False
if self.external_solver == "highs":
self.solver = HighsCustom()
self.solver.setOptionValue("solver", "choose")
self.solver.setOptionValue("threads", kwargs.get("threads", SolverWrapper.threads))
self.solver.setOptionValue("time_limit", kwargs.get("time_limit", SolverWrapper.time_limit))
self.solver.setOptionValue("presolve", kwargs.get("presolve", SolverWrapper.presolve))
self.solver.setOptionValue("log_to_console", kwargs.get("log_to_console", SolverWrapper.log_to_console))
self.solver.setOptionValue("mip_rel_gap", self.tolerance)
self.solver.setOptionValue("mip_feasibility_tolerance", self.tolerance)
self.solver.setOptionValue("mip_abs_gap", self.tolerance)
self.solver.setOptionValue("mip_rel_gap", self.tolerance)
self.solver.setOptionValue("primal_feasibility_tolerance", self.tolerance)
elif self.external_solver == "gurobi":
import gurobipy
self.env = gurobipy.Env(empty=True)
self.env.setParam("OutputFlag", 0)
self.env.setParam("LogToConsole", 1 if kwargs.get("log_to_console", SolverWrapper.log_to_console) == "true" else 0)
self.env.setParam("OutputFlag", 1 if kwargs.get("log_to_console", SolverWrapper.log_to_console) == "true" else 0)
self.env.setParam("TimeLimit", kwargs.get("time_limit", SolverWrapper.time_limit))
self.env.setParam("Threads", kwargs.get("threads", SolverWrapper.threads))
self.env.setParam("MIPGap", self.tolerance)
self.env.setParam("IntFeasTol", self.tolerance)
self.env.setParam("FeasibilityTol", self.tolerance)
self.env.start()
self.solver = gurobipy.Model(env=self.env)
else:
utils.logger.error(f"{__name__}: Unsupported solver type `{self.external_solver}`. Supported solvers are `highs` and `gurobi`.")
raise ValueError(
f"Unsupported solver type `{self.external_solver}`, supported solvers are `highs` and `gurobi`."
)
utils.logger.debug(f"{__name__}: solver_options (kwargs) = {kwargs}")
# Pending bound updates to apply in batch before solving
# Stores backend variables directly
self._pending_fix_vars = [] # list[var]
self._pending_fix_vals = [] # list[float]
self._pending_lb_vars = [] # list[var]
self._pending_lb_vals = [] # list[float]
|
add_binary_continuous_product_constraint
add_binary_continuous_product_constraint(
binary_var,
continuous_var,
product_var,
lb,
ub,
name: str,
)
Description
This function adds constraints to model the equality: binary_var
* continuous_var
= product_var
.
Assumptions
binary_var
\(\in [0,1]\)
- lb ≤
continuous_var
≤ ub
Note
This works correctly also if continuous_var
is an integer variable.
Parameters:
Name |
Type |
Description |
Default |
binary_var
|
variable
|
|
required
|
continuous_var
|
variable
|
The continuous variable (can also be integer).
|
required
|
product_var
|
variable
|
The variable that should be equal to the product of the binary and continuous variables.
|
required
|
lb
|
float
|
The lower bound of the continuous variable.
|
required
|
ub
|
float
|
The upper bound of the continuous variable.
|
required
|
name
|
str
|
The name of the constraint.
|
required
|
Source code in flowpaths/utils/solverwrapper.py
| def add_binary_continuous_product_constraint(self, binary_var, continuous_var, product_var, lb, ub, name: str):
"""
Description
-----------
This function adds constraints to model the equality: `binary_var` * `continuous_var` = `product_var`.
Assumptions:
- `binary_var` $\in [0,1]$
- lb ≤ `continuous_var` ≤ ub
Note:
This works correctly also if `continuous_var` is an integer variable.
Args:
binary_var (variable): The binary variable.
continuous_var (variable): The continuous variable (can also be integer).
product_var (variable): The variable that should be equal to the product of the binary and continuous variables.
lb (float): The lower bound of the continuous variable.
ub (float): The upper bound of the continuous variable.
name (str): The name of the constraint.
"""
self.add_constraint(product_var <= ub * binary_var, name=name + "_a")
self.add_constraint(product_var >= lb * binary_var, name=name + "_b")
self.add_constraint(product_var <= continuous_var - lb * (1 - binary_var), name=name + "_c")
self.add_constraint(product_var >= continuous_var - ub * (1 - binary_var), name=name + "_d")
|
add_constraint
add_constraint(expr, name='')
Add a linear (in)equation to the model.
Parameters
expr : linear expression / bool
The solver specific constraint expression.
name : str, optional
Optional identifier for the constraint.
Source code in flowpaths/utils/solverwrapper.py
| def add_constraint(self, expr, name=""):
"""Add a linear (in)equation to the model.
Parameters
----------
expr : linear expression / bool
The solver specific constraint expression.
name : str, optional
Optional identifier for the constraint.
"""
if self.external_solver == "highs":
self.solver.addConstr(expr, name=name)
elif self.external_solver == "gurobi":
self.solver.addConstr(expr, name=name)
|
add_integer_continuous_product_constraint
add_integer_continuous_product_constraint(
integer_var,
continuous_var,
product_var,
lb,
ub,
name: str,
)
This function adds constraints to model the equality
integer_var * continuous_var = product_var
Assumptions
lb <= product_var <= ub
Note
This works correctly also if continuous_var
is an integer variable.
Parameters
binary_var : Variable
The binary variable.
continuous_var : Variable
The continuous variable (can also be integer).
product_var : Variable
The variable that should be equal to the product of the binary and continuous variables.
lb, ub : float
The lower and upper bounds of the continuous variable.
name : str
The name of the constraint
Source code in flowpaths/utils/solverwrapper.py
| def add_integer_continuous_product_constraint(self, integer_var, continuous_var, product_var, lb, ub, name: str):
"""
This function adds constraints to model the equality:
integer_var * continuous_var = product_var
Assumptions
-----------
lb <= product_var <= ub
!!!tip "Note"
This works correctly also if `continuous_var` is an integer variable.
Parameters
----------
binary_var : Variable
The binary variable.
continuous_var : Variable
The continuous variable (can also be integer).
product_var : Variable
The variable that should be equal to the product of the binary and continuous variables.
lb, ub : float
The lower and upper bounds of the continuous variable.
name : str
The name of the constraint
"""
num_bits = ceil(log2(ub + 1))
bits = list(range(num_bits))
binary_vars = self.add_variables(
indexes=bits,
name_prefix=f"binary_{name}",
lb=0,
ub=1,
var_type="integer"
)
# We encode integer_var == sum(binary_vars[i] * 2^i)
self.add_constraint(
self.quicksum(binary_vars[i] * 2**i for i in bits)
== integer_var,
name=f"{name}_int_eq"
)
comp_vars = self.add_variables(
indexes=bits,
name_prefix=f"comp_{name}",
lb=lb,
ub=ub,
var_type="continuous"
)
# We encode comp_vars[i] == binary_vars[i] * continuous_var
for i in bits:
self.add_binary_continuous_product_constraint(
binary_var=binary_vars[i],
continuous_var=continuous_var,
product_var=comp_vars[i],
lb=lb,
ub=ub,
name=f"product_{i}_{name}"
)
# We encode product_var == sum_{i in bits} comp_vars[i] * 2^i
self.add_constraint(
self.quicksum(comp_vars[i] * 2**i for i in bits)
== product_var,
name=f"{name}_prod_eq"
)
|
add_piecewise_constant_constraint
add_piecewise_constant_constraint(
x,
y,
ranges: list,
constants: list,
name_prefix: str,
)
Enforces that variable y
equals a constant from constants
depending on the range that x
falls into.
For each piece i
if x in [ranges[i][0], ranges[i][1]] then y = constants[i].
Assumptions
- The ranges must be non-overlapping. Otherwise, if x belongs to more ranges, the solver will choose one arbitrarily.
- The value of x must be within the union of the ranges. Otherwise the solver will not find a feasible solution.
This is modeled by:
- introducing binary variables z[i] with sum(z) = 1,
- for each piece i:
x >= L_i - M*(1 - z[i])
x <= U_i + M*(1 - z[i])
y <= constant[i] + M*(1 - z[i])
y >= constant[i] - M*(1 - z[i])
Parameters
x: The continuous variable (created earlier) whose value determines the segment.
y: The continuous variable whose value equals the corresponding constant.
ranges: List of tuples [(L0, U0), (L1, U1), …]
constants: List of constants [c0, c1, …] for each segment.
name_prefix: A prefix for naming the added variables and constraints.
Returns
y: The created piecewise output variable.
Source code in flowpaths/utils/solverwrapper.py
| def add_piecewise_constant_constraint(
self, x, y, ranges: list, constants: list, name_prefix: str
):
"""
Enforces that variable `y` equals a constant from `constants` depending on the range that `x` falls into.
For each piece i:
`if x in [ranges[i][0], ranges[i][1]] then y = constants[i].`
Assumptions:
- The ranges must be non-overlapping. Otherwise, if x belongs to more ranges, the solver will choose one arbitrarily.
- The value of x must be within the union of the ranges. Otherwise the solver will not find a feasible solution.
This is modeled by:
- introducing binary variables z[i] with sum(z) = 1,
- for each piece i:
`x >= L_i - M*(1 - z[i])`
`x <= U_i + M*(1 - z[i])`
`y <= constant[i] + M*(1 - z[i])`
`y >= constant[i] - M*(1 - z[i])`
Parameters
----------
x: The continuous variable (created earlier) whose value determines the segment.
y: The continuous variable whose value equals the corresponding constant.
ranges: List of tuples [(L0, U0), (L1, U1), ...]
constants: List of constants [c0, c1, ...] for each segment.
name_prefix: A prefix for naming the added variables and constraints.
Returns
-------
y: The created piecewise output variable.
"""
if len(ranges) != len(constants):
utils.logger.error(f"{__name__}: The length of `ranges` and `constants` must be the same.")
raise ValueError("`ranges` and `constants` must have the same length.")
pieces = len(ranges)
Ls = [r[0] for r in ranges]
Us = [r[1] for r in ranges]
M = (max(Us) - min(Ls)) * 2
# Create binary variables z[i] for each piece.
z = self.add_variables(
[(i) for i in range(pieces)],
name_prefix=f"z_{name_prefix}",
lb=0,
ub=1,
var_type="integer"
)
# Enforce that exactly one piece is active: sum_i z[i] == 1.
self.add_constraint(self.quicksum(z[i] for i in range(pieces)) == 1, name=f"sum_z_{name_prefix}")
# For each piece i, add the constraints:
for i in range(pieces):
L = Ls[i]
U = Us[i]
c = constants[i]
# Link x with the range [L, U] if piece i is active.
self.add_constraint(x >= L - M * (1 - z[i]), name=f"{name_prefix}_L_{i}")
self.add_constraint(x <= U + M * (1 - z[i]), name=f"{name_prefix}_U_{i}")
self.add_constraint(y <= c + M * (1 - z[i]), name=f"{name_prefix}_yU_{i}")
self.add_constraint(y >= c - M * (1 - z[i]), name=f"{name_prefix}_yL_{i}")
|
add_variables
add_variables(
indexes,
name_prefix: str,
lb=0,
ub=1,
var_type="integer",
)
Create a set of variables sharing a common name prefix.
Important: Avoid collisions!
This function does not track or enforce unique/non-overlapping
prefixes. The caller is responsible for choosing prefixes that do not
create ambiguous names when mixed with other variables.
Parameters
indexes : iterable
Iterable of index labels (numbers or hashables) used only to
suffix the variable names for uniqueness.
name_prefix : str
Prefix for each created variable (e.g. x_
). Must be unique with
respect to existing prefixes (no prefix / super-prefix relations).
lb, ub : float | dict | sequence, default (0, 1)
Lower and upper bounds for created variables. Can be:
- scalars (applied to all indexes), or
- dicts mapping each index -> bound, or
- sequences aligned with indexes
order (same length).
var_type : {“integer”, “continuous”}, default “integer”
Variable domain type.
Returns
dict | list
Mapping from provided index to underlying solver variable objects
(HiGHS returns an internal structure; Gurobi returns a dict).
Source code in flowpaths/utils/solverwrapper.py
| def add_variables(self, indexes, name_prefix: str, lb=0, ub=1, var_type="integer"):
"""Create a set of variables sharing a common name prefix.
!!! warning "Important: Avoid collisions!"
This function does not track or enforce unique/non-overlapping
prefixes. The caller is responsible for choosing prefixes that do not
create ambiguous names when mixed with other variables.
Parameters
----------
indexes : iterable
Iterable of index labels (numbers or hashables) used *only* to
suffix the variable names for uniqueness.
name_prefix : str
Prefix for each created variable (e.g. ``x_``). Must be unique with
respect to existing prefixes (no prefix / super-prefix relations).
lb, ub : float | dict | sequence, default (0, 1)
Lower and upper bounds for created variables. Can be:
- scalars (applied to all indexes), or
- dicts mapping each index -> bound, or
- sequences aligned with ``indexes`` order (same length).
var_type : {"integer", "continuous"}, default "integer"
Variable domain type.
Returns
-------
dict | list
Mapping from provided index to underlying solver variable objects
(HiGHS returns an internal structure; Gurobi returns a dict).
"""
# No internal tracking of prefixes; caller must avoid collisions.
# Normalize bounds to per-index arrays when necessary
def _materialize_bounds(param, default_value, param_name):
# scalar
if isinstance(param, (int, float)):
return [float(param)] * len(indexes)
# dict mapping index -> value
if isinstance(param, dict):
vals = []
missing = []
for idx in indexes:
if idx in param:
vals.append(float(param[idx]))
else:
missing.append(idx)
if missing:
utils.logger.error(f"{__name__}: Missing {param_name} for indexes: {missing[:3]}{'...' if len(missing)>3 else ''}")
raise ValueError(f"Missing {param_name} for some indexes")
return vals
# sequence aligned with indexes
try:
seq = list(param)
if len(seq) != len(indexes):
utils.logger.error(f"{__name__}: Length of {param_name} ({len(seq)}) does not match number of indexes ({len(indexes)}).")
raise ValueError(f"Length of {param_name} does not match number of indexes.")
return [float(x) for x in seq]
except TypeError:
# Not iterable; fall back to default scalar for all
return [float(default_value)] * len(indexes)
lbs = _materialize_bounds(lb, 0.0, "lb")
ubs = _materialize_bounds(ub, 1.0, "ub")
if self.external_solver == "highs":
var_type_map = {
"integer": highspy.HighsVarType.kInteger,
"continuous": highspy.HighsVarType.kContinuous,
}
return self.solver.addVariables(
indexes,
lb=lbs,
ub=ubs,
type=var_type_map[var_type],
name_prefix=name_prefix)
elif self.external_solver == "gurobi":
import gurobipy
var_type_map = {
"integer": gurobipy.GRB.INTEGER,
"continuous": gurobipy.GRB.CONTINUOUS,
}
# Single batched call using keys with per-index bounds
keys = list(indexes)
lb_map = {idx: float(lbs[pos]) for pos, idx in enumerate(keys)}
ub_map = {idx: float(ubs[pos]) for pos, idx in enumerate(keys)}
vars_td = self.solver.addVars(
keys,
lb=lb_map,
ub=ub_map,
vtype=var_type_map[var_type],
name=name_prefix,
)
# Keep model in a consistent state
self.solver.update()
return vars_td
|
fix_variable
fix_variable(
var,
value: Union[int, float],
)
Fix an existing variable to a constant value by tightening its bounds.
This avoids adding an explicit equality constraint (var == value) which can
slow down solving compared to changing bounds directly.
Parameters
var : backend variable object
The variable returned previously by add_variables
.
value : int | float
The value to which the variable should be fixed.
Source code in flowpaths/utils/solverwrapper.py
| def fix_variable(self, var, value: Union[int, float]):
"""Fix an existing variable to a constant value by tightening its bounds.
This avoids adding an explicit equality constraint (var == value) which can
slow down solving compared to changing bounds directly.
Parameters
----------
var : backend variable object
The variable returned previously by ``add_variables``.
value : int | float
The value to which the variable should be fixed.
"""
# Normalize to float for solvers expecting floating bounds
value = float(value)
if self.external_solver == "gurobi":
# Gurobi exposes direct LB / UB attributes
try:
var.LB = value
var.UB = value
except Exception as e:
utils.logger.error(f"{__name__}: Could not fix gurobi variable: {e}")
raise
elif self.external_solver == "highs":
# HiGHS: change column bounds using internal index of variable
try:
self.solver.changeColsBounds(
1,
np.array([var.index], dtype=np.int32),
np.array([value], dtype=np.float64 if isinstance(value, float) else np.int64),
np.array([value], dtype=np.float64 if isinstance(value, float) else np.int64),
)
except Exception as e:
utils.logger.error(f"{__name__}: Could not fix highs variable: {e}")
raise
|
get_all_variable_names
Return names for all variables in solver insertion order.
Source code in flowpaths/utils/solverwrapper.py
| def get_all_variable_names(self):
"""Return names for all variables in solver insertion order."""
if self.external_solver == "highs":
return self.solver.allVariableNames()
elif self.external_solver == "gurobi":
return [var.VarName for var in self.solver.getVars()]
|
get_all_variable_values
get_all_variable_values()
Return values for all variables in solver insertion order.
Source code in flowpaths/utils/solverwrapper.py
| def get_all_variable_values(self):
"""Return values for all variables in solver insertion order."""
if self.external_solver == "highs":
return self.solver.allVariableValues()
elif self.external_solver == "gurobi":
return [var.X for var in self.solver.getVars()]
|
get_model_status
get_model_status(raw=False)
Return HiGHS style model status string (or raw Gurobi code).
If the custom timeout was triggered the synthetic status kTimeLimit
is returned irrespective of the underlying solver state.
Parameters
raw : bool, default False
When using Gurobi: if True
return the untouched integer status
code; otherwise attempt to map to a HiGHS style enum name.
Returns
str | int
Status name (always a string for HiGHS). Integer code for Gurobi
only when raw=True
.
Source code in flowpaths/utils/solverwrapper.py
| def get_model_status(self, raw = False):
"""Return HiGHS style model status string (or raw Gurobi code).
If the *custom* timeout was triggered the synthetic status ``kTimeLimit``
is returned irrespective of the underlying solver state.
Parameters
----------
raw : bool, default False
When using Gurobi: if ``True`` return the untouched integer status
code; otherwise attempt to map to a HiGHS style enum name.
Returns
-------
str | int
Status name (always a string for HiGHS). Integer code for Gurobi
only when ``raw=True``.
"""
# If the solver has timed out with our custom time limit, we return the timeout status
# This is set in the __run_with_timeout function
if self.did_timeout:
return "kTimeLimit"
# If the solver has not timed out, we return the model status
if self.external_solver == "highs":
return self.solver.getModelStatus().name
elif self.external_solver == "gurobi":
return SolverWrapper.gurobi_status_to_highs.get(self.solver.status, self.solver.status) if not raw else self.solver.status
|
get_objective_value
Return objective value of last solve.
Returns
float
Objective value according to the configured optimization sense.
Source code in flowpaths/utils/solverwrapper.py
| def get_objective_value(self):
"""Return objective value of last solve.
Returns
-------
float
Objective value according to the configured optimization sense.
"""
if self.external_solver == "highs":
return self.solver.getObjectiveValue()
elif self.external_solver == "gurobi":
return self.solver.objVal
|
get_values
get_values(
variables,
binary_values: bool = False,
) -> dict
Return solution values for variables without name parsing.
Parameters
variables : iterable | mapping
Either an iterable of (index, variable) pairs, or a mapping where
keys are indices and values are variable objects (e.g., a dict or
a Gurobi tupledict).
binary_values : bool, default False
If True, round values to 0/1 and validate against tolerance.
Returns
dict
Dictionary mapping each provided index to its solution value.
Notes
- For HiGHS, values are retrieved using the internal column index of each
variable via a single call to
allVariableValues
.
- For Gurobi, values are read from
Var.X
.
- Indices are taken from the first element of each tuple in
variables
(when an iterable of pairs is supplied). If a mapping is supplied,
its items() are used.
Source code in flowpaths/utils/solverwrapper.py
| def get_values(self, variables, binary_values: bool = False) -> dict:
"""Return solution values for variables without name parsing.
Parameters
----------
variables : iterable | mapping
Either an iterable of (index, variable) pairs, or a mapping where
keys are indices and values are variable objects (e.g., a dict or
a Gurobi tupledict).
binary_values : bool, default False
If True, round values to 0/1 and validate against tolerance.
Returns
-------
dict
Dictionary mapping each provided index to its solution value.
Notes
-----
- For HiGHS, values are retrieved using the internal column index of each
variable via a single call to ``allVariableValues``.
- For Gurobi, values are read from ``Var.X``.
- Indices are taken from the first element of each tuple in ``variables``
(when an iterable of pairs is supplied). If a mapping is supplied,
its items() are used.
"""
# Prepare a value accessor per backend
if self.external_solver == "highs":
all_vals = self.get_all_variable_values()
def _val_of(v):
idx = getattr(v, "index", None)
if idx is None:
raise Exception("HiGHS variable object missing 'index' attribute.")
return all_vals[idx]
elif self.external_solver == "gurobi":
def _val_of(v):
return v.X
else:
raise ValueError(f"Unsupported solver type '{self.external_solver}'.")
def _maybe_round_binary(val):
if not binary_values:
return val
tol = max(1e-9, getattr(self, "tolerance", 1e-9))
rv = int(round(val))
if rv not in (0, 1) or abs(val - rv) > tol:
raise Exception(f"Variable has non-binary value {val}")
return rv
# Build an iterator of (index, variable) pairs
try:
pair_iter = variables.items()
except AttributeError:
# variables may already be an iterable of pairs or a mapping that
# doesn't expose items(); handle both
def _pair_gen():
for elem in variables:
if isinstance(elem, tuple) and len(elem) == 2:
yield elem # (index, var)
else:
# Assume elem is a key into a mapping supporting __getitem__
yield (elem, variables[elem])
pair_iter = _pair_gen()
result = {}
for key, var in pair_iter:
value = _val_of(var)
result[key] = _maybe_round_binary(value)
return result
|
get_variable_values
get_variable_values(
name_prefix,
index_types: list,
binary_values: bool = False,
) -> dict
Deprecated
Use get_values(...)
instead.
Retrieve the values of variables belonging to a given prefix.
This method matches variables using one of these forms for the given
name_prefix
:
- Structured names: <prefix>(i, j, ...)
or <prefix>[i, j, ...]
- Legacy single numeric suffix: <prefix>k
or <prefix>_k
- Exact scalar variable name: <prefix>
(when index_types
is empty)
Under these rules, overlapping prefixes (e.g., x
and x_long
)
won’t interfere with each other. Callers must still avoid custom ad-hoc
naming that mimics these patterns for different variables.
Parameters:
Name |
Type |
Description |
Default |
name_prefix
|
str
|
The prefix of the variable names to filter.
|
required
|
index_types
|
list
|
A list of types corresponding to the indices of the variables.
Each type in the list is used to cast the string indices to
the appropriate type.
If empty, then it is assumed that the variable has no index, and does exact matching with the variable name.
|
required
|
binary_values
|
bool
|
If True, ensures that the variable values (rounded) are
binary (0 or 1). Defaults to False.
|
False
|
Returns:
Name | Type |
Description |
values |
dict
|
A dictionary where the keys are the indices of the variables (as tuples or
single values) and the values are the corresponding variable values.
If index_types is empty, then the unique key is 0 and the value is the variable value.
|
Raises:
Type |
Description |
Exception
|
If the length of index_types does not match the number of indices
in a variable name.
|
Exception
|
If binary_values is True and a variable value (rounded) is not binary.
|
Source code in flowpaths/utils/solverwrapper.py
| def get_variable_values(
self, name_prefix, index_types: list, binary_values: bool = False
) -> dict:
"""
!!! warning "Deprecated"
Use `get_values(...)` instead.
Retrieve the values of variables belonging to a given prefix.
This method matches variables using one of these forms for the given
``name_prefix``:
- Structured names: ``<prefix>(i, j, ...)`` or ``<prefix>[i, j, ...]``
- Legacy single numeric suffix: ``<prefix>k`` or ``<prefix>_k``
- Exact scalar variable name: ``<prefix>`` (when ``index_types`` is empty)
Under these rules, overlapping prefixes (e.g., ``x`` and ``x_long``)
won't interfere with each other. Callers must still avoid custom ad-hoc
naming that mimics these patterns for different variables.
Args:
name_prefix (str): The prefix of the variable names to filter.
index_types (list): A list of types corresponding to the indices of the variables.
Each type in the list is used to cast the string indices to
the appropriate type.
If empty, then it is assumed that the variable has no index, and does exact matching with the variable name.
binary_values (bool, optional): If True, ensures that the variable values (rounded) are
binary (0 or 1). Defaults to False.
Returns:
values: A dictionary where the keys are the indices of the variables (as tuples or
single values) and the values are the corresponding variable values.
If index_types is empty, then the unique key is 0 and the value is the variable value.
Raises:
Exception: If the length of `index_types` does not match the number of indices
in a variable name.
Exception: If `binary_values` is True and a variable value (rounded) is not binary.
"""
# Emit a deprecation warning (hidden by default unless enabled by filters)
warnings.warn(
"SolverWrapper.get_variable_values is deprecated and will be removed in a future release. "
"Use SolverWrapper.get_values(...) instead.",
DeprecationWarning,
stacklevel=2,
)
varNames = self.get_all_variable_names()
varValues = self.get_all_variable_values()
values: dict = {}
def _cast_components(comps, types):
casted = []
for c, t in zip(comps, types):
if t is int:
casted.append(int(c))
elif t is float:
casted.append(float(c))
elif t is str:
casted.append(str(c))
else:
casted.append(t(c))
if len(casted) == 0:
return ()
if len(casted) == 1:
return casted[0]
return tuple(casted)
simple_numeric = re.compile(r"^-?\d+$")
for i, var in enumerate(varNames):
val = varValues[i]
# Scalar exact name
if not index_types:
if var == name_prefix:
values[0] = val
if binary_values:
rv = int(round(values[0]))
if rv not in (0, 1):
raise Exception(f"Variable {var} has value {values[0]}, which is not binary.")
values[0] = rv
# exact scalar match is unique
continue
else:
continue
# Structured prefix(name) or prefix[name]
comps = self.parse_var_name(var, name_prefix)
if comps is not None:
if len(comps) != len(index_types):
# Ignore mismatched arity
continue
try:
key = _cast_components(comps, index_types)
except Exception:
# Skip if casting fails
continue
values[key] = val
continue
# Legacy numeric suffix: prefix<idx> or prefix_<idx>
if len(index_types) == 1 and var.startswith(name_prefix):
suffix = var[len(name_prefix):]
if suffix.startswith("_"):
suffix_try = suffix[1:]
else:
suffix_try = suffix
if simple_numeric.match(suffix_try):
try:
key = _cast_components([suffix_try], index_types)
values[key] = val
except Exception:
pass
if binary_values:
tol = max(1e-9, getattr(self, "tolerance", 1e-9))
for k, v in list(values.items()):
rv = int(round(v))
if rv not in (0, 1) or abs(v - rv) > tol:
raise Exception(f"Variable {name_prefix}{k if k!=0 else ''} has non-binary value {v}")
values[k] = rv
return values
|
optimize
Run the solver.
Behaviour:
- If time_limit
is infinity OR use_also_custom_timeout
is
False
we rely solely on the backend’s time limit handling.
- Otherwise we also arm a POSIX signal based timeout (coarse, whole
seconds) which, when firing, sets did_timeout
. The underlying
solver is not forcibly terminated beyond the signal alarm; we rely on
cooperative interruption.
Source code in flowpaths/utils/solverwrapper.py
| def optimize(self):
"""Run the solver.
Behaviour:
- If ``time_limit`` is infinity OR ``use_also_custom_timeout`` is
``False`` we rely solely on the backend's time limit handling.
- Otherwise we also arm a POSIX signal based timeout (coarse, whole
seconds) which, when firing, sets ``did_timeout``. The underlying
solver is not forcibly terminated beyond the signal alarm; we rely on
cooperative interruption.
"""
# Resetting the timeout flag
self.did_timeout = False
# For both solvers, we have the same function to call
# If the time limit is infinite, we call the optimize function directly
# Otherwise, we call the function with a timeout
# Apply any queued bound updates right before solving
self._apply_pending_bound_updates()
if self.time_limit == float('inf') or (not self.use_also_custom_timeout):
self.solver.optimize()
else:
utils.logger.debug(f"{__name__}: Running also with use_also_custom_timeout ({self.time_limit} sec)")
self._run_with_timeout(self.time_limit, self.solver.optimize)
|
parse_var_name
parse_var_name(
string, name_prefix
)
Parse a variable name and extract indices inside parentheses or brackets.
Supported forms:
- ``<prefix>(i,j,...)`` (HiGHS style / tuple repr)
- ``<prefix>[i,j,...]`` (Gurobi addVars style)
- Single-suffix names ``<prefix><i>`` are handled elsewhere.
Returns a list of raw index components as strings (quotes stripped).
Commas inside nested parentheses/brackets are treated as part of the
component, e.g. var((a,b))
-> [“(a,b)”].
Source code in flowpaths/utils/solverwrapper.py
| def parse_var_name(self, string, name_prefix):
"""Parse a variable name and extract indices inside parentheses or brackets.
Supported forms:
- ``<prefix>(i,j,...)`` (HiGHS style / tuple repr)
- ``<prefix>[i,j,...]`` (Gurobi addVars style)
- Single-suffix names ``<prefix><i>`` are handled elsewhere.
Returns a list of raw index components as strings (quotes stripped).
Commas inside nested parentheses/brackets are treated as part of the
component, e.g. ``var((a,b))`` -> ["(a,b)"].
"""
# Try parentheses first
match = re.match(rf"{re.escape(name_prefix)}\(\s*(.*?)\s*\)$", string)
if match:
components_str = match.group(1)
else:
# Try square brackets (Gurobi addVars style)
match = re.match(rf"{re.escape(name_prefix)}\[\s*(.*?)\s*\]$", string)
if match:
components_str = match.group(1)
else:
# Not a structured name for this prefix
return None
# Split at top-level commas only; keep commas inside nested () or []
if components_str.strip() == "":
return []
components = []
buf = []
in_quote = False
depth_paren = 0
depth_brack = 0
for ch in components_str:
if in_quote:
buf.append(ch)
if ch == "'":
in_quote = False
continue
if ch == "'":
in_quote = True
buf.append(ch)
continue
if ch == '(':
depth_paren += 1
buf.append(ch)
continue
if ch == ')' and depth_paren > 0:
depth_paren -= 1
buf.append(ch)
continue
if ch == '[':
depth_brack += 1
buf.append(ch)
continue
if ch == ']' and depth_brack > 0:
depth_brack -= 1
buf.append(ch)
continue
if ch == ',' and depth_paren == 0 and depth_brack == 0:
token = ''.join(buf).strip()
if len(token) >= 2 and token[0] == "'" and token[-1] == "'":
token = token[1:-1]
components.append(token)
buf = []
else:
buf.append(ch)
# Flush last token
token = ''.join(buf).strip()
if len(token) >= 2 and token[0] == "'" and token[-1] == "'":
token = token[1:-1]
if token != "" or components_str.strip() != "":
components.append(token)
return components
|
print_variable_names_values
print_variable_names_values()
Print name = value
lines for every variable (debug helper).
Source code in flowpaths/utils/solverwrapper.py
| def print_variable_names_values(self):
"""Print ``name = value`` lines for every variable (debug helper)."""
varNames = self.get_all_variable_names()
varValues = self.get_all_variable_values()
for index, var in enumerate(varNames):
print(f"{var} = {varValues[index]}")
|
queue_fix_variable
queue_fix_variable(
var,
value: Union[int, float],
)
Queue a variable to be fixed (LB=UB=value) in a later batch update.
Source code in flowpaths/utils/solverwrapper.py
| def queue_fix_variable(self, var, value: Union[int, float]):
"""Queue a variable to be fixed (LB=UB=value) in a later batch update."""
self._pending_fix_vars.append(var)
self._pending_fix_vals.append(float(value))
|
queue_set_var_lower_bound
queue_set_var_lower_bound(
var, lb: Union[int, float]
)
Queue a variable to have its lower bound raised to lb
in batch.
Source code in flowpaths/utils/solverwrapper.py
| def queue_set_var_lower_bound(self, var, lb: Union[int, float]):
"""Queue a variable to have its lower bound raised to ``lb`` in batch."""
self._pending_lb_vars.append(var)
self._pending_lb_vals.append(float(lb))
|
quicksum
Backend agnostic fast summation of linear terms.
Parameters
expr : iterable
Iterable of linear terms.
Returns
linear expression
A solver specific linear expression representing the sum.
Source code in flowpaths/utils/solverwrapper.py
| def quicksum(self, expr):
"""Backend agnostic fast summation of linear terms.
Parameters
----------
expr : iterable
Iterable of linear terms.
Returns
-------
linear expression
A solver specific linear expression representing the sum.
"""
if self.external_solver == "highs":
return self.solver.qsum(expr)
elif self.external_solver == "gurobi":
import gurobipy
return gurobipy.quicksum(expr)
|
set_objective
set_objective(
expr, sense="minimize"
)
Set (and replace) the linear objective.
For HiGHS this delegates to HighsCustom.set_objective_without_solving
(i.e. does not trigger a solve). For Gurobi it uses the native
setObjective
method.
Parameters
expr : linear expression
Objective linear expression.
sense : {“minimize”, “min”, “maximize”, “max”}, default “minimize”
Optimization direction.
Raises
ValueError
If sense
is invalid.
Source code in flowpaths/utils/solverwrapper.py
| def set_objective(self, expr, sense="minimize"):
"""Set (and replace) the linear objective.
For HiGHS this delegates to ``HighsCustom.set_objective_without_solving``
(i.e. does not trigger a solve). For Gurobi it uses the native
``setObjective`` method.
Parameters
----------
expr : linear expression
Objective linear expression.
sense : {"minimize", "min", "maximize", "max"}, default "minimize"
Optimization direction.
Raises
------
ValueError
If ``sense`` is invalid.
"""
if sense not in ["minimize", "min", "maximize", "max"]:
utils.logger.error(f"{__name__}: The objective sense must be either `minimize` or `maximize`.")
raise ValueError(f"Objective sense {sense} is not supported. Only [\"minimize\", \"min\", \"maximize\", \"max\"] are supported.")
self.optimization_sense = sense
if self.external_solver == "highs":
self.solver.set_objective_without_solving(expr, sense=sense)
elif self.external_solver == "gurobi":
import gurobipy
self.solver.setObjective(
expr,
gurobipy.GRB.MINIMIZE if sense in ["minimize", "min"] else gurobipy.GRB.MAXIMIZE,
)
|
write_model
Persist model to a file supported by the backend.
Parameters
filename : str | os.PathLike
Target path. HiGHS chooses format based on extension; Gurobi uses
native write
method behaviour.
Source code in flowpaths/utils/solverwrapper.py
| def write_model(self, filename):
"""Persist model to a file supported by the backend.
Parameters
----------
filename : str | os.PathLike
Target path. HiGHS chooses format based on extension; Gurobi uses
native ``write`` method behaviour.
"""
if self.external_solver == "highs":
self.solver.writeModel(filename)
elif self.external_solver == "gurobi":
self.solver.write(filename)
|