#!/usr/bin/env python3
# Copyright (c) Meta Platforms, Inc. and affiliates.
#
# This source code is licensed under the MIT license found in the
# LICENSE file in the root directory of this source tree.
r"""
Base class for test functions for optimization benchmarks.
"""
from __future__ import annotations
from abc import ABC, abstractmethod
from collections.abc import Iterable, Iterator
from typing import Any, Protocol
import torch
from botorch.exceptions.errors import InputDataError, UnsupportedError
from pyre_extensions import none_throws
from torch import Tensor
from torch.nn import Module
[docs]
def validate_parameter_indices(
dim: int,
bounds: Tensor,
continuous_inds: list[int],
discrete_inds: list[int],
categorical_inds: list[int],
) -> None:
r"""Check that the parameter indices are valid.
Args:
dim: Number of search space dimensions.
bounds: A ``2 x d``-dim tensor of lower and upper bounds.
continuous_inds: List of unique integers corresponding to continuous parameters.
discrete_inds: List of unique integers corresponding to discrete parameters.
categorical_inds: List of unique integers corresponding to categorical
parameters.
Raises:
ValueError: If the parameter indices are invalid.
"""
for inds in [continuous_inds, discrete_inds, categorical_inds]:
if len(inds) == 0:
continue # Nothing to check
if not (
isinstance(inds, list)
and all(isinstance(x, int) for x in inds)
and len(set(inds)) == len(inds)
and min(inds) >= 0
and max(inds) <= dim - 1
):
raise ValueError(
"All parameter indices must be a list with unique integers between "
f"0 and dim - 1. Got {inds=}."
)
# Let's make sure all parameters are covered.
all_inds = continuous_inds + discrete_inds + categorical_inds
if (len(all_inds) != dim) or (set(all_inds) != set(range(dim))):
raise ValueError(
f"All parameter indices must be present, got {dim=}, {continuous_inds=}, "
f"{discrete_inds=}, {categorical_inds=}"
)
# Check the shape of the bounds
if not bounds.shape == (2, dim):
raise ValueError(
f"Expected `bounds` to have shape `2 x d`. Got {bounds.shape=}, {dim=}."
)
# Check that the bounds are integer valued for discrete and categorical parameters.
for inds in [discrete_inds, categorical_inds]:
if len(inds) > 0 and (not (bounds[:, inds] == bounds[:, inds].round()).all()):
raise ValueError(
"Expected the lower and upper bounds of the discrete and categorical "
"parameters to be integer-valued."
)
[docs]
class BaseTestProblem(Module, ABC):
r"""Base class for test functions."""
dim: int
_bounds: list[
tuple[float, float]
] # Bounds, must be integers for discrete/categorical parameters
_check_grad_at_opt: bool = True
continuous_inds: list[int] = [] # Float-valued range parameters (bounds inclusive)
discrete_inds: list[int] = [] # Ordered integer parameters (bounds inclusive)
categorical_inds: list[int] = [] # Unordered integer parameters (bounds inclusive)
# Whether the problem is a minimization problem by default, with ``negate=False``.
_is_minimization_by_default: bool = True
def __init__(
self,
noise_std: None | float | list[float] = None,
negate: bool = False,
dtype: torch.dtype = torch.double,
) -> None:
r"""Base constructor for test functions.
Args:
noise_std: Standard deviation of the observation noise. If a list is
provided, specifies separate noise standard deviations for each
objective in a multiobjective problem.
negate: If True, negate the function.
dtype: The dtype that is used for the bounds of the function.
"""
super().__init__()
self.noise_std = noise_std
self.negate = negate
if len(self._bounds) != self.dim:
raise InputDataError(
"Expected the bounds to match the dimensionality of the domain. "
f"Got {self.dim=} and {len(self._bounds)=}."
)
self.register_buffer(
"bounds",
torch.tensor(self._bounds, dtype=dtype).transpose(-1, -2),
)
validate_parameter_indices(
dim=self.dim,
bounds=self.bounds,
continuous_inds=self.continuous_inds,
discrete_inds=self.discrete_inds,
categorical_inds=self.categorical_inds,
)
[docs]
def forward(self, X: Tensor, noise: bool = True) -> Tensor:
r"""Evaluate the function on a set of points.
Args:
X: A ``(batch_shape) x d``-dim tensor of point(s) at which to evaluate
the function.
noise: If ``True``, add observation noise as specified by ``noise_std``.
Returns:
A ``batch_shape``-dim tensor of function evaluations.
"""
f = self.evaluate_true(X=X)
if noise and self.noise_std is not None:
_noise = torch.tensor(self.noise_std, device=X.device, dtype=X.dtype)
f += _noise * torch.randn_like(f)
if self.negate:
f = -f
return f
[docs]
def evaluate_true(self, X: Tensor) -> Tensor:
r"""
Evaluate the function (w/o observation noise) on a set of points.
Args:
X: A ``(batch_shape) x d``-dim tensor of point(s) at which to
evaluate.
Returns:
A ``batch_shape``-dim tensor.
"""
validate_inputs(
X=X,
dim=self.dim,
bounds=self.bounds,
discrete_inds=self.discrete_inds,
categorical_inds=self.categorical_inds,
)
return self._evaluate_true(X=X)
@abstractmethod
def _evaluate_true(self, X: Tensor) -> Tensor:
r"""Evaluate the function (w/o observation noise) on a set of points.
Args:
X: A ``(batch_shape) x d``-dim tensor of point(s) at which to
evaluate.
Returns:
A ``batch_shape``-dim tensor.
"""
pass # pragma: no cover
@property
def is_minimization_problem(self) -> bool:
r"""Whether the problem is a minimization problem, after accounting
for the ``negate`` option.
"""
return (
self._is_minimization_by_default
if not self.negate
else not self._is_minimization_by_default
)
[docs]
class ConstrainedBaseTestProblem(BaseTestProblem, ABC):
r"""Base class for test functions with constraints.
In addition to one or more objectives, a problem may have a number of outcome
constraints of the form ``c_i(x) >= 0`` for ``i=1, ..., n_c``.
This base class provides common functionality for such problems.
"""
num_constraints: int
_check_grad_at_opt: bool = False
constraint_noise_std: None | float | list[float] = None
_worst_feasible_value: float | None = None
[docs]
def evaluate_slack(self, X: Tensor, noise: bool = True) -> Tensor:
r"""Evaluate the constraint slack on a set of points.
Constraints ``i`` is assumed to be feasible at ``x`` if the associated slack
``c_i(x)`` is positive. Zero slack means that the constraint is active. Negative
slack means that the constraint is violated.
Args:
X: A ``batch_shape x d``-dim tensor of point(s) at which to evaluate the
constraint slacks: ``c_1(X), ...., c_{n_c}(X)``.
noise: If ``True``, add observation noise to the slack as specified by
``noise_std``.
Returns:
A ``batch_shape x n_c``-dim tensor of constraint slack (where positive slack
corresponds to the constraint being feasible).
"""
cons = self.evaluate_slack_true(X=X)
if noise and self.constraint_noise_std is not None:
_constraint_noise = torch.tensor(
self.constraint_noise_std, device=X.device, dtype=X.dtype
)
cons += _constraint_noise * torch.randn_like(cons)
return cons
[docs]
def is_feasible(self, X: Tensor, noise: bool = True) -> Tensor:
r"""Evaluate whether the constraints are feasible on a set of points.
Args:
X: A ``batch_shape x d``-dim tensor of point(s) at which to evaluate the
constraints.
noise: If ``True``, add observation noise as specified by ``noise_std``.
Returns:
A ``batch_shape``-dim boolean tensor that is ``True`` iff all constraint
slacks (potentially including observation noise) are positive.
"""
return (self.evaluate_slack(X=X, noise=noise) >= 0.0).all(dim=-1)
[docs]
def evaluate_slack_true(self, X: Tensor) -> Tensor:
r"""Evaluate the constraint slack (w/o observation noise) on a set of points.
Args:
X: A ``batch_shape x d``-dim tensor of point(s) at which to evaluate the
constraint slacks: ``c_1(X), ...., c_{n_c}(X)``.
Returns:
A ``batch_shape x n_c``-dim tensor of constraint slack (where positive slack
corresponds to the constraint being feasible).
"""
validate_inputs(
X=X,
dim=self.dim,
bounds=self.bounds,
discrete_inds=self.discrete_inds,
categorical_inds=self.categorical_inds,
)
return self._evaluate_slack_true(X=X)
@abstractmethod
def _evaluate_slack_true(self, X: Tensor) -> Tensor:
r"""Evaluate the constraint slack (w/o observation noise) on a set of points.
Args:
X: A ``batch_shape x d``-dim tensor of point(s) at which to evaluate the
constraint slacks: ``c_1(X), ...., c_{n_c}(X)``.
Returns:
A ``batch_shape x n_c``-dim tensor of constraint slack (where positive slack
corresponds to the constraint being feasible).
"""
pass # pragma: no cover
@property
def worst_feasible_value(self) -> float:
r"""The worst feasible value of the objective function. This is useful when
evaluating the performance of different optimization methods as this value
can be assigned to all infeasible trials. This has the desirable property that
any feasible trial has better performance than an infeasible trial.
"""
if isinstance(self, MultiObjectiveTestProblem):
return 0.0 # Can return 0.0 for MOO since this is the smallest possible HV
elif self._worst_feasible_value is not None:
return (
-self._worst_feasible_value
if self.negate
else self._worst_feasible_value
)
raise NotImplementedError(
f"Problem {self.__class__.__name__} does not specify the "
"worst feasible value."
)
[docs]
class MultiObjectiveTestProblem(BaseTestProblem, ABC):
r"""Base class for multi-objective test functions.
TODO: add a pareto distance function that returns the distance
between a provided point and the closest point on the true pareto front.
"""
num_objectives: int
_ref_point: list[float]
_max_hv: float | None = None
def __init__(
self,
noise_std: None | float | list[float] = None,
negate: bool = False,
dtype: torch.dtype = torch.double,
) -> None:
r"""Base constructor for multi-objective test functions.
Args:
noise_std: Standard deviation of the observation noise. If a list is
provided, specifies separate noise standard deviations for each
objective.
negate: If True, negate the objectives.
"""
if isinstance(noise_std, list) and len(noise_std) != len(self._ref_point):
raise InputDataError(
f"If specified as a list, length of noise_std ({len(noise_std)}) "
f"must match the number of objectives ({len(self._ref_point)})"
)
super().__init__(noise_std=noise_std, negate=negate, dtype=dtype)
ref_point = torch.tensor(self._ref_point, dtype=dtype)
if negate:
ref_point *= -1
self.register_buffer("ref_point", ref_point)
@property
def max_hv(self) -> float:
if self._max_hv is not None:
return self._max_hv
else:
raise NotImplementedError(
f"Problem {self.__class__.__name__} does not specify maximal "
"hypervolume."
)
[docs]
def gen_pareto_front(self, n: int) -> Tensor:
r"""Generate ``n`` pareto optimal points."""
raise NotImplementedError
@property
def is_minimization_problem(self) -> bool:
raise UnsupportedError(
"`is_minimization_problem` is not a valid property for "
"multi-objective problems. "
)
[docs]
class SeedingMixin(ABC):
_seeds: Iterator[int] | None
_current_seed: int | None
@property
def has_seeds(self) -> bool:
return self._seeds is not None
[docs]
def increment_seed(self) -> int:
self._current_seed = next(none_throws(self._seeds))
return none_throws(self._current_seed)
@property
def seed(self) -> int | None:
return self._current_seed
# Outlier problems
[docs]
class OutlierGenerator(Protocol):
def __call__(self, problem: BaseTestProblem, X: Tensor, bounds: Tensor) -> Tensor:
"""Call signature for outlier generators for single-objective problems.
Args:
problem: The test problem.
X: The input tensor.
bounds: The bounds of the test problem.
Returns:
A tensor of outliers with shape X.shape[:-1] (1d if unbatched).
"""
pass # pragma: no cover
[docs]
def constant_outlier_generator(
problem: Any, X: Tensor, bounds: Any, constant: float
) -> Tensor:
"""
Generates outliers that are all the same constant. To be used in conjunction
with ``partial`` to fix the constant value and conform to the
``OutlierGenerator`` protocol.
Example:
>>> generator = partial(constant_outlier_generator, constant=1.0)
Args:
problem: Not used.
X: The ``batch_shape x n x d``-dim inputs. Also determines the number, dtype,
and device of the returned tensor.
bounds: Not used.
constant: The constant value of the outliers.
Returns:
Tensor of shape ``batch_shape x n`` (1d if unbatched).
"""
return torch.full(X.shape[:-1], constant, dtype=X.dtype, device=X.device)
[docs]
class CorruptedTestProblem(BaseTestProblem, SeedingMixin):
def __init__(
self,
base_test_problem: BaseTestProblem,
outlier_generator: OutlierGenerator,
outlier_fraction: float,
bounds: list[tuple[float, float]] | None = None,
seeds: Iterable[int] | None = None,
) -> None:
"""A problem with outliers.
NOTE: Both noise_std and negate will be taken from the base test problem.
Args:
base_test_problem: The base function to be corrupted.
outlier_generator: A function that generates outliers. It will be called
with arguments ``f``, ``X`` and ``bounds``, where ``f`` is the
``base_test_problem``, ``X`` is the
argument passed to the ``forward`` method, and ``bounds``
are as here, and it returns the values of outliers.
outlier_fraction: The fraction of outliers.
bounds: The bounds of the function.
seeds: The seeds to use for the outlier generator. If seeds are provided,
the problem will iterate through the list of seeds, changing the seed
with a call to ``next(seeds)`` with every ``forward`` call. If a list is
provided, it will first be converted to an iterator.
"""
self.dim: int = base_test_problem.dim
self.continuous_inds = base_test_problem.continuous_inds
self.discrete_inds = base_test_problem.discrete_inds
self.categorical_inds = base_test_problem.categorical_inds
self._bounds: list[tuple[float, float]] = (
bounds if bounds is not None else base_test_problem._bounds
)
super().__init__(
noise_std=base_test_problem.noise_std,
negate=base_test_problem.negate,
)
self.base_test_problem = base_test_problem
self.outlier_generator = outlier_generator
self.outlier_fraction = outlier_fraction
self._current_seed: int | None = None
self._seeds: Iterator[int] | None = None if seeds is None else iter(seeds)
def _evaluate_true(self, X: Tensor) -> Tensor:
return self.base_test_problem.evaluate_true(X)
[docs]
def forward(self, X: Tensor, noise: bool = True) -> Tensor:
"""
Generate data at X and corrupt it, if noise is True.
Args:
X: The ``batch_shape x n x d``-dim inputs.
noise: Whether to corrupt the data.
Returns:
A ``batch_shape x n``-dim tensor.
"""
Y = super().forward(X, noise=noise)
if noise:
if self.has_seeds:
self.increment_seed()
torch.manual_seed(self.seed)
corrupt = torch.rand(X.shape[:-1]) < self.outlier_fraction
outliers = self.outlier_generator(
problem=self.base_test_problem, X=X, bounds=self.bounds
)
Y = torch.where(corrupt, outliers, Y)
return Y