#!/usr/bin/env python3
# Copyright (c) Meta Platforms, Inc. and affiliates.
#
# This source code is licensed under the MIT license found in the
# LICENSE file in the root directory of this source tree.
from __future__ import annotations
import warnings
from collections.abc import Callable
from typing import Any
import numpy as np
import torch
from botorch.acquisition.multi_objective.objective import (
IdentityMCMultiOutputObjective,
MCMultiOutputObjective,
)
from botorch.acquisition.multioutput_acquisition import MultiOutputAcquisitionFunction
from botorch.exceptions import BotorchWarning
from botorch.utils.multi_objective.hypervolume import get_hypervolume_maximizing_subset
from botorch.utils.multi_objective.pareto import is_non_dominated
from torch import Tensor
try:
from pymoo.algorithms.moo.nsga2 import NSGA2
from pymoo.core.problem import Problem
from pymoo.core.repair import Repair
from pymoo.optimize import minimize
from pymoo.termination.max_gen import MaximumGenerationTermination
[docs]
class DiscreteParameterRepair(Repair):
"""Pymoo Repair operator that rounds discrete parameters to valid values.
This repair operator is applied after each generation to ensure that
discrete parameters are snapped to their nearest allowed values.
"""
def __init__(
self,
discrete_choices: dict[int, list[float]],
) -> None:
"""Initialize the repair operator.
Args:
discrete_choices: A mapping from dimension index to allowed discrete
values. Only dimensions in this mapping will be rounded.
"""
super().__init__()
self.discrete_choices = discrete_choices
def _do(
self,
problem: Problem,
X: np.ndarray,
**kwargs: Any,
) -> np.ndarray:
"""Round discrete dimensions to nearest allowed values.
Args:
problem: The pymoo Problem instance.
X: A ``pop_size x n_var`` array of candidate solutions.
**kwargs: Additional keyword arguments.
Returns:
The repaired array with discrete dimensions rounded.
"""
for dim, allowed_values in self.discrete_choices.items():
allowed = np.array(allowed_values)
# For each candidate, find nearest allowed value
vals = X[:, dim]
# Compute distances to all allowed values: (pop_size, num_choices)
distances = np.abs(vals[:, np.newaxis] - allowed[np.newaxis, :])
# Find index of nearest allowed value
nearest_idx = np.argmin(distances, axis=1)
# Replace with nearest allowed value
X[:, dim] = allowed[nearest_idx]
return X
[docs]
class BotorchPymooProblem(Problem):
def __init__(
self,
n_var: int,
n_obj: int,
xl: np.ndarray,
xu: np.ndarray,
acqf: MultiOutputAcquisitionFunction,
dtype: torch.dtype,
device: torch.device,
ref_point: Tensor | None = None,
objective: MCMultiOutputObjective | None = None,
constraints: list[Callable[[Tensor], Tensor]] | None = None,
inequality_constraints: list[tuple[Tensor, Tensor, float]] | None = None,
) -> None:
"""PyMOO problem for optimizing the model posterior mean using NSGA-II.
This is instantiated and used within ``optimize_with_nsgaii`` to define
the optimization problem to interface with pymoo.
This assumes maximization of all objectives.
Args:
n_var: The number of tunable parameters (``d``).
n_obj: The number of objectives.
xl: A ``d``-dim np.ndarray of lower bounds for each tunable parameter.
xu: A ``d``-dim np.ndarray of upper bounds for each tunable parameter.
acqf: A MultiOutputAcquisitionFunction.
dtype: The torch dtype.
device: The torch device.
acqf: The acquisition function to optimize.
ref_point: A list or tensor with ``m`` elements representing the
reference point (in the outcome space), which is treated as a
lower bound on the objectives, after applying ``objective`` to
the samples.
objective: The MCMultiOutputObjective under which the samples are
evaluated. Defaults to ``IdentityMultiOutputObjective()``.
This can be used to determine which outputs of the
MultiOutputAcquisitionFunction should be used as
objectives/constraints in NSGA-II.
constraints: A list of callables, each mapping a Tensor of dimension
``sample_shape x batch-shape x q x m`` to a Tensor of dimension
``sample_shape x batch-shape x q``, where negative values imply
feasibility.
inequality_constraints: A list of tuples (indices, coefficients, rhs),
representing inequality constraints of the form
``sum_i (X[indices[i]] * coefficients[i]) >= rhs``. These are
parameter-space constraints (as opposed to outcome-space
constraints).
"""
num_constraints = 0 if constraints is None else len(constraints)
if ref_point is not None:
num_constraints += ref_point.shape[0]
if inequality_constraints is not None:
num_constraints += len(inequality_constraints)
super().__init__(
n_var=n_var,
n_obj=n_obj,
n_ieq_constr=num_constraints,
xl=xl,
xu=xu,
type_var=np.double,
)
self.botorch_acqf = acqf
self.botorch_ref_point = ref_point
self.botorch_objective = (
IdentityMCMultiOutputObjective() if objective is None else objective
)
self.botorch_constraints = constraints
self.botorch_inequality_constraints = inequality_constraints
self.torch_dtype = dtype
self.torch_device = device
def _evaluate(self, x: np.ndarray, out: dict[str, np.ndarray]) -> None:
"""Evaluate x with respect to the objective/constraints."""
X = torch.from_numpy(x).to(dtype=self.torch_dtype, device=self.torch_device)
with torch.no_grad():
# eval in batch mode, since all we need is the mean and this helps
# avoid ill-conditioning
y = self.botorch_acqf(X=X.unsqueeze(-2))
obj = self.botorch_objective(y)
# negate the objectives, since we want to maximize this function
out["F"] = -obj.cpu().numpy()
constraint_vals = None
if self.botorch_constraints is not None:
constraint_vals = torch.stack(
[c(y) for c in self.botorch_constraints], dim=-1
)
if self.botorch_ref_point is not None:
# add constraints for the ref point
ref_constraints = self.botorch_ref_point - obj
if constraint_vals is not None:
constraint_vals = torch.cat(
[constraint_vals, ref_constraints], dim=-1
)
else:
constraint_vals = ref_constraints
# Handle parameter-space inequality constraints
# These are of the form sum_i (X[indices[i]] * coefficients[i]) >= rhs
# PyMOO expects constraints in the form G(x) <= 0, so we transform:
# sum_i (X[indices[i]] * coefficients[i]) >= rhs
# => rhs - sum_i (X[indices[i]] * coefficients[i]) <= 0
if self.botorch_inequality_constraints is not None:
ineq_constraint_vals = []
for indices, coefficients, rhs in self.botorch_inequality_constraints:
# X is (pop_size, n_var), indices is (num_terms,)
# Extract relevant columns and compute linear combination
lhs = (X[:, indices] * coefficients).sum(dim=-1)
# Transform to G(x) <= 0 form: rhs - lhs <= 0
ineq_constraint_vals.append(rhs - lhs)
ineq_constraints = torch.stack(ineq_constraint_vals, dim=-1)
if constraint_vals is not None:
constraint_vals = torch.cat(
[constraint_vals, ineq_constraints], dim=-1
)
else:
constraint_vals = ineq_constraints
if constraint_vals is not None:
out["G"] = constraint_vals.cpu().numpy()
[docs]
def optimize_with_nsgaii(
acq_function: MultiOutputAcquisitionFunction,
bounds: Tensor,
num_objectives: int,
q: int | None = None,
ref_point: list[float] | Tensor | None = None,
objective: MCMultiOutputObjective | None = None,
constraints: list[Callable[[Tensor], Tensor]] | None = None,
inequality_constraints: list[tuple[Tensor, Tensor, float]] | None = None,
population_size: int = 250,
max_gen: int | None = None,
seed: int | None = None,
fixed_features: dict[int, float] | None = None,
max_attempts: int = 2,
discrete_choices: dict[int, list[float]] | None = None,
post_processing_func: Callable[[Tensor], Tensor] | None = None,
) -> tuple[Tensor, Tensor]:
"""Optimize the posterior mean via NSGA-II, returning the Pareto set and front.
This assumes maximization of all objectives.
Args:
acq_function: The MultiOutputAcquisitionFunction to optimize.
bounds: A ``2 x d`` tensor of lower and upper bounds for each column of
``X``.
q: The number of candidates. If None, return the full population.
num_objectives: The number of objectives.
ref_point: A list or tensor with ``m`` elements representing the reference
point (in the outcome space), which is treated as a lower bound
on the objectives, after applying ``objective`` to the samples.
objective: The MCMultiOutputObjective under which the samples are
evaluated. Defaults to ``IdentityMultiOutputObjective()``.
This can be used to determine which outputs of the
MultiOutputAcquisitionFunction should be used as
objectives/constraints in NSGA-II.
constraints: A list of callables, each mapping a Tensor of dimension
``sample_shape x batch-shape x q x m`` to a Tensor of dimension
``sample_shape x batch-shape x q``, where negative values imply
feasibility.
inequality_constraints: A list of tuples (indices, coefficients, rhs),
representing inequality constraints of the form
``sum_i (X[indices[i]] * coefficients[i]) >= rhs``. These are
parameter-space constraints (as opposed to outcome-space
constraints).
population_size: the population size for NSGA-II.
max_gen: The number of iterations for NSGA-II. If None, this uses the
default termination condition in pymoo for NSGA-II.
seed: The random seed for NSGA-II.
fixed_features: A map ``{feature_index: value}`` for features that
should be fixed to a particular value during generation. All indices
should be non-negative.
max_attempts: The total number of times to run the optimization if it
fails (usually due to NSGA-II failing to find a feasible point).
discrete_choices: A mapping from dimension index to allowed discrete
values. When provided, a repair operator is used during NSGA-II
optimization to ensure discrete dimensions are snapped to their
nearest allowed values after each generation. This provides better
handling of mixed continuous/discrete search spaces compared to
post-hoc rounding. Dimensions in ``fixed_features`` are
automatically excluded.
post_processing_func: A function that post-processes optimization results,
e.g., to round discrete dimensions to valid values. The function
should take an ``n x d`` tensor and return a tensor of the same shape
with post-processed values. When provided, the objective values Y are
re-evaluated after post-processing to ensure accuracy.
Note: Constraint feasibility is not re-checked after post-processing.
NSGA-II enforces constraints on the original (pre-processed) X, but
post-processing (e.g., rounding) could make previously feasible
solutions infeasible. This mirrors the behavior of other optimizers
like ``optimize_acqf``. For parameter-space constraints, use
Ax-level validation (e.g., ``validate_candidates``) as a safety net.
Returns:
A two-element tuple containing the pareto set X and pareto frontier Y.
"""
tkwargs = {"dtype": bounds.dtype, "device": bounds.device}
if ref_point is not None:
ref_point = torch.as_tensor(ref_point, **tkwargs)
if fixed_features is not None:
bounds = bounds.clone()
# set lower and upper bounds to the fixed value
for i, val in fixed_features.items():
bounds[:, i] = val
# Filter out fixed features from discrete_choices to avoid
# redundant processing for dimensions that are pinned.
if discrete_choices is not None and fixed_features is not None:
discrete_choices = {
k: v for k, v in discrete_choices.items() if k not in fixed_features
}
# Create repair operator for discrete parameters if needed
repair = None
if discrete_choices:
repair = DiscreteParameterRepair(
discrete_choices={k: list(v) for k, v in discrete_choices.items()}
)
def _opt_with_nsgaii():
with warnings.catch_warnings():
warnings.simplefilter("ignore", category=DeprecationWarning)
pymoo_problem = BotorchPymooProblem(
n_var=bounds.shape[-1],
n_obj=num_objectives,
xl=bounds[0].cpu().numpy(),
xu=bounds[1].cpu().numpy(),
acqf=acq_function,
ref_point=ref_point,
objective=objective,
constraints=constraints,
inequality_constraints=inequality_constraints,
**tkwargs,
)
pop_size = max(population_size, q) if q is not None else population_size
algorithm = NSGA2(
pop_size=pop_size,
eliminate_duplicates=True,
repair=repair,
)
res = minimize(
problem=pymoo_problem,
algorithm=algorithm,
termination=(
None
if max_gen is None
else MaximumGenerationTermination(n_max_gen=max_gen)
),
seed=seed,
verbose=False,
)
return res
# run optimization with retries in case NSGA-II fails to find a feasible point
for i in range(1, max_attempts + 1):
res = _opt_with_nsgaii()
if res.X is not None:
break
if i == max_attempts:
raise RuntimeError(
f"NSGA-II failed to find a feasible point after {max_attempts} "
f"attempts. Consider relaxing the constraints or increasing "
"the population size."
)
X = torch.tensor(res.X, **tkwargs)
# Apply post-processing (e.g., rounding discrete dimensions)
if post_processing_func is not None:
X = post_processing_func(X)
# Re-evaluate objectives since X has changed after post-processing.
# This ensures Y values are accurate for the post-processed X.
botorch_objective = (
IdentityMCMultiOutputObjective() if objective is None else objective
)
with torch.no_grad():
Y = botorch_objective(acq_function(X.unsqueeze(-2)))
else:
# multiply by negative one to return the correct sign for maximization
Y = -torch.tensor(res.F, **tkwargs)
pareto_mask = is_non_dominated(Y, deduplicate=True)
X_pareto = X[pareto_mask]
Y_pareto = Y[pareto_mask]
if q is not None:
if Y_pareto.shape[0] > q:
Y_pareto, indices = get_hypervolume_maximizing_subset(
# use nadir as reference point since we likely don't care about the
# extrema as much as the interior
n=q,
Y=Y_pareto,
ref_point=Y_pareto.min(dim=0).values,
)
X_pareto = X_pareto[indices]
elif Y_pareto.shape[0] < q:
n_missing = q - Y_pareto.shape[0]
if Y.shape[0] >= q:
# select some dominated solutions
rand_idcs = np.random.choice(
(~pareto_mask).nonzero().view(-1).cpu().numpy(),
n_missing,
replace=False,
)
rand_idcs = torch.from_numpy(rand_idcs).to(
device=pareto_mask.device
)
pareto_mask[rand_idcs] = 1
X_pareto = X[pareto_mask]
Y_pareto = Y[pareto_mask]
else:
warnings.warn(
f"NSGA-II only returned {Y.shape[0]} points.",
BotorchWarning,
stacklevel=3,
)
return X, Y
return X_pareto, Y_pareto
except ImportError: # pragma: no cover
pass