Source code for botorch.utils.testing

#!/usr/bin/env python3
# Copyright (c) Meta Platforms, Inc. and affiliates.
#
# This source code is licensed under the MIT license found in the
# LICENSE file in the root directory of this source tree.

from __future__ import annotations

import math
import warnings
from abc import abstractmethod
from collections import OrderedDict
from collections.abc import Callable, Sequence
from itertools import product
from typing import Any
from unittest import mock, TestCase
from warnings import warn

import torch
from botorch.acquisition.objective import PosteriorTransform
from botorch.exceptions.warnings import (
    BotorchTensorDimensionWarning,
    InputDataWarning,
    NumericsWarning,
)
from botorch.models.model import FantasizeMixin, Model
from botorch.posteriors.gpytorch import GPyTorchPosterior
from botorch.posteriors.posterior import Posterior
from botorch.sampling.base import MCSampler
from botorch.sampling.get_sampler import GetSampler
from botorch.sampling.stochastic_samplers import StochasticSampler
from botorch.test_functions.base import (
    BaseTestProblem,
    ConstrainedBaseTestProblem,
    CorruptedTestProblem,
    MultiObjectiveTestProblem,
)
from botorch.test_functions.synthetic import Rosenbrock
from botorch.utils.transforms import unnormalize
from gpytorch.distributions import MultitaskMultivariateNormal, MultivariateNormal
from linear_operator.operators import AddedDiagLinearOperator, DiagLinearOperator
from torch import Tensor


EMPTY_SIZE = torch.Size()


[docs] def skip_if_import_error(func: Callable) -> Callable: def f(*args, **kwargs): try: return func(*args, **kwargs) except ImportError as e: warn( "Skipping test because module is not installed. Received the " f"following error: {e}" ) return f
[docs] def sample_random_feasible( f: BaseTestProblem, dtype: torch.dtype, device: torch.device ) -> Tensor: r"""Sample random feasible point for the given test function. Args: f: The test function instance. dtype: The dtype of the random point. device: The device of the random point. Returns: A random feasible point of shape ``1 x f.dim``. """ round_ids = f.discrete_inds + f.categorical_inds if isinstance(f, ConstrainedBaseTestProblem): # Sample a bunch of points and hope that one of them is feasible. # We could repeat this in a loop but it is not worth risking the # tests hanging forever. If no feasible point is found, we can bypass the test. X = unnormalize( torch.rand(2**12, f.dim, dtype=dtype, device=device), bounds=f.bounds, ) X[..., round_ids] = X[..., round_ids].round() feasible = (f.evaluate_slack(X) >= 0).all(dim=-1) if feasible.any(): return X[feasible][0] else: # pragma: no cover raise RuntimeError( f"No feasible point found for {f.__class__.__name__}. Skipping test." ) X = unnormalize( torch.rand(1, f.dim, dtype=dtype, device=device), bounds=f.bounds, ) X[..., round_ids] = X[..., round_ids].round() return X
[docs] class BotorchTestCase(TestCase): r"""Basic test case for Botorch. This 1. sets the default device to be ``torch.device("cpu")`` 2. ensures that no warnings are suppressed by default. """ device = torch.device("cpu")
[docs] def setUp(self, suppress_input_warnings: bool = True) -> None: """Set up the test case. Args: suppress_input_warnings: If True, suppress common input warnings (see below). """ warnings.resetwarnings() warnings.simplefilter("always", append=True) if suppress_input_warnings: warnings.filterwarnings( "ignore", message="The model inputs are of type", category=InputDataWarning, ) warnings.filterwarnings( "ignore", message="Non-strict enforcement of botorch tensor conventions.", category=BotorchTensorDimensionWarning, ) warnings.filterwarnings( "ignore", message=r"Data \(outcome observations\) is not standardized ", category=InputDataWarning, ) warnings.filterwarnings( "ignore", message=r"Data \(input features\) is not", category=InputDataWarning, ) warnings.filterwarnings( "ignore", message="has known numerical issues", category=NumericsWarning, ) warnings.filterwarnings( "ignore", message="Model converter code is deprecated", category=DeprecationWarning, )
[docs] def assertAllClose( self, input: Any, other: Any, rtol: float = 1e-05, atol: float = 1e-08, equal_nan: bool = False, ) -> None: r"""Assert that two tensors are close. Calls torch.testing.assert_close, using the signature and default behavior of torch.allclose. The formula asserted is abs(input - other) <= atol + rtol * abs(other). Args: input: First tensor or tensor-or-scalar-like to compare other: Second tensor or tensor-or-scalar-like to compare rtol: Relative tolerance atol: Absolute tolerance equal_nan: If True, consider NaN values as equal Example output: AssertionError: Scalars are not close! Absolute difference: 1.0000034868717194 (up to 0.0001 allowed) Relative difference: 0.8348668001940709 (up to 1e-05 allowed) """ # Why not just use the signature and behavior of ``torch.testing.assert_close``? # Because we used ``torch.allclose`` for testing in the past, and the two don't # behave exactly the same. In particular, ``assert_close`` requires both # ``atol`` and ``rtol`` to be set if either one is. torch.testing.assert_close( input, other, rtol=rtol, atol=atol, equal_nan=equal_nan, )
[docs] class BaseTestProblemTestCaseMixIn: r"""Mixin for testing BaseTestProblem (functions) implementations."""
[docs] def test_forward_and_evaluate_true(self): r"""Run every BaseTestProblem in ``self.functions`` on random inputs. Runs both ``forward`` and ``evaluate_true``. """ dtypes = (torch.float, torch.double) batch_shapes = (torch.Size(), torch.Size([2]), torch.Size([2, 3])) for dtype, batch_shape, f in product(dtypes, batch_shapes, self.functions): f.to(device=self.device, dtype=dtype) X = torch.rand(*batch_shape, f.dim, device=self.device, dtype=dtype) X = f.bounds[0] + X * (f.bounds[1] - f.bounds[0]) for inds in [f.discrete_inds, f.categorical_inds]: X[..., inds] = X[..., inds].round() res_forward = f(X) res_evaluate_true = f.evaluate_true(X) # Evaluating outside bounds should raise X_out_of_bounds = f.bounds[1:, :] + 1 with self.assertRaisesRegex( ValueError, "Expected `X` to be within the bounds of the test problem." ): f(X_out_of_bounds) for method, res in { "forward": res_forward, "evaluate_true": res_evaluate_true, }.items(): with self.subTest( f"{dtype}_{batch_shape}_{f.__class__.__name__}_{method}" ): self.assertEqual(res.dtype, dtype) self.assertEqual(res.device.type, self.device.type) tail_shape = torch.Size( [f.num_objectives] if f.num_objectives > 1 else [] ) self.assertEqual(res.shape, batch_shape + tail_shape)
@property @abstractmethod def functions(self) -> Sequence[BaseTestProblem]: r"""The functions that should be tested. Typically defined as a class attribute on the test case subclassing this class. """
[docs] class SyntheticTestFunctionTestCaseMixin: r"""Mixin for testing synthetic ``BaseTestProblem`` aka test functions."""
[docs] def test_optimal_value(self): """Test that a function's optimal_value is correctly computed, and defined if it should be. """ for dtype in (torch.float, torch.double): for f in self.functions: f.to(device=self.device, dtype=dtype) if f._optimal_value is None: with self.assertRaisesRegex(NotImplementedError, "optimal value"): f.optimal_value else: optval = f.optimal_value optval_exp = -f._optimal_value if f.negate else f._optimal_value self.assertEqual(optval, optval_exp)
[docs] def test_optimizer(self): r"""Test that optimizers are correctly computed and the optimizer value is better than the function value at some random point. """ for dtype in (torch.float, torch.double): for f in self.functions: f.to(device=self.device, dtype=dtype) try: Xopt = f.optimizers.clone().requires_grad_(True) except NotImplementedError: continue res = f(Xopt, noise=False) # if we have optimizers, we have the optimal value res_exp = torch.full_like(res, f.optimal_value) self.assertAllClose(res, res_exp, atol=1e-3, rtol=1e-3) if f._check_grad_at_opt: grad = torch.autograd.grad([*res], Xopt)[0] self.assertLess(grad.abs().max().item(), 1e-3) # Check that the optimizer is better than (or equal to) a random point. try: random_point = sample_random_feasible( f=f, dtype=dtype, device=self.device ) except RuntimeError: # pragma: no cover # If no feasible point is found, we can skip the test. # Infeasible points can have better than optimal values. continue f_random = f(random_point, noise=False).item() f_opt = res[0].item() if f.is_minimization_problem: self.assertLessEqual(f_opt, f_random) else: self.assertGreaterEqual(f_opt, f_random)
@property @abstractmethod def functions(self) -> Sequence[BaseTestProblem]: """The functions that should be tested. Typically defined as a class attribute on the test case subclassing this class. """ pass # pragma: no cover
[docs] class MultiObjectiveTestProblemTestCaseMixin: r"""Mixin for testing multi-objective test problems. This class provides test cases for attributes, maximum hypervolume, and reference points of multi-objective test problems. """
[docs] def test_attributes(self): r"""Test that each function has the required attributes.""" for f in self.functions: self.assertTrue(hasattr(f, "dim")) self.assertTrue(hasattr(f, "num_objectives")) self.assertEqual(f.bounds.shape, torch.Size([2, f.dim]))
[docs] def test_max_hv(self): r"""Test the maximum hypervolume (max_hv) attribute for each function.""" for dtype in (torch.float, torch.double): for f in self.functions: f.to(device=self.device, dtype=dtype) if f._max_hv is None: with self.assertRaises(NotImplementedError): f.max_hv else: self.assertEqual(f.max_hv, f._max_hv)
[docs] def test_ref_point(self): """Test the reference point (ref_point) attribute for each function (for each dtype). """ for dtype in (torch.float, torch.double): for f in self.functions: f.to(dtype=dtype, device=self.device) self.assertTrue( torch.allclose( f.ref_point, torch.tensor(f._ref_point, dtype=dtype, device=self.device), ) )
@property @abstractmethod def functions(self) -> Sequence[BaseTestProblem]: """The functions that should be tested. Typically defined as a class attribute on the test case subclassing this class. """ pass # pragma: no cover
[docs] class ConstrainedTestProblemTestCaseMixin: """Mixin for testing constrained test problems. This class provides test cases for attributes and methods of constrained test problems, including testing the number of constraints and the evaluation of constraint slack. """
[docs] def test_num_constraints(self): """Test that each function has the required num_constraints attribute.""" for f in self.functions: self.assertTrue(hasattr(f, "num_constraints"))
[docs] def test_evaluate_slack(self): """Test the evaluate_slack method for each function. This test verifies that: 1. The evaluate_slack_true and evaluate_slack methods return tensors of the expected shape 2. The relationship between evaluate_slack and evaluate_slack_true is consistent with the constraint_noise_std attribute of the function """ for dtype in (torch.float, torch.double): for f in self.functions: f.to(device=self.device, dtype=dtype) X = unnormalize( torch.rand(1, f.dim, device=self.device, dtype=dtype), bounds=f.bounds, ) slack_true = f.evaluate_slack_true(X) # Mock out the random generator to ensure that noise realizations are # sizable so we don't run into any floating point comparison issues. with mock.patch( "botorch.test_functions.base.torch.randn_like", side_effect=lambda y: y, ): slack_observed = f.evaluate_slack(X) self.assertEqual(slack_true.shape, torch.Size([1, f.num_constraints])) self.assertEqual( slack_observed.shape, torch.Size([1, f.num_constraints]) ) is_equal = (slack_observed == slack_true).bool() if isinstance(f.constraint_noise_std, float): self.assertEqual( is_equal.all().item(), f.constraint_noise_std == 0.0 ) elif isinstance(f.constraint_noise_std, list): for i, noise_std in enumerate(f.constraint_noise_std): self.assertEqual( is_equal[:, i].item(), noise_std in (0.0, None) ) else: self.assertTrue(is_equal.all().item())
[docs] def test_worst_feasible_value(self): """Test that a function's worst_feasible_value is correctly computed, and defined if it should be. """ for dtype in (torch.float, torch.double): for f in self.functions: f.to(device=self.device, dtype=dtype) if f._worst_feasible_value is None: self.assertTrue(isinstance(f, MultiObjectiveTestProblem)) self.assertGreaterEqual(f.worst_feasible_value, 0.0) else: worst_feas_val = f.worst_feasible_value worst_feas_val_exp = ( -f._worst_feasible_value if f.negate else f._worst_feasible_value ) self.assertEqual(worst_feas_val, worst_feas_val_exp)
@property @abstractmethod def functions(self) -> Sequence[BaseTestProblem]: r"""The functions that should be tested. Typically defined as a class attribute on the test case subclassing this class. """ pass # pragma: no cover
[docs] class TestCorruptedProblemsMixin(BotorchTestCase): r"""Mixin for testing corrupted test problems. This class provides setup and utility functions for testing corrupted test problems using a specified outlier generator and a Rosenbrock problem. """
[docs] def setUp(self, suppress_input_warnings: bool = True) -> None: r"""Set up the test case with a dummy outlier generator and a Rosenbrock problem. Args: suppress_input_warnings: If True, suppress common input warnings. """ super().setUp(suppress_input_warnings=suppress_input_warnings) def outlier_generator( problem: torch.Tensor | BaseTestProblem, X: Any, bounds: Any ) -> torch.Tensor: r"""Generate outliers for the given problem. Args: problem: The test problem. X: Input tensor. bounds: Bounds for the input. Returns: A tensor of ones with the same shape as the input. """ return torch.ones(X.shape[0]) self.outlier_generator = outlier_generator self.rosenbrock_problem = CorruptedTestProblem( base_test_problem=Rosenbrock(), outlier_fraction=1.0, outlier_generator=outlier_generator, seeds=[1, 2], )
[docs] class MockPosterior(Posterior): r"""This class is used to simulate a posterior with specified mean, variance, and samples. Everything is deterministic in this class. """ def __init__( self, mean: torch.Tensor | None = None, variance: torch.Tensor | None = None, samples: torch.Tensor | None = None, base_shape: torch.Size | None = None, batch_range: tuple[int, int] | None = None, ) -> None: r"""Initialize the MockPosterior with specified attributes. Args: mean: The mean of the posterior. variance: The variance of the posterior. samples: Samples to return from ``rsample``, unless ``base_samples`` is provided. base_shape: If given, this is returned as ``base_sample_shape``, and also used as the base of the ``_extended_shape``. batch_range: If given, this is returned as ``batch_range``. Defaults to (0, -2). """ self._mean = mean self._variance = variance self._samples = samples self._base_shape = base_shape self._batch_range = batch_range or (0, -2) @property def device(self) -> torch.device: r"""Return the device of the posterior.""" for t in (self._mean, self._variance, self._samples): if torch.is_tensor(t): return t.device return torch.device("cpu") @property def dtype(self) -> torch.dtype: r"""Return the data type of the posterior.""" for t in (self._mean, self._variance, self._samples): if torch.is_tensor(t): return t.dtype return torch.float32 @property def batch_shape(self) -> torch.Size: r"""Return the batch shape of the posterior.""" for t in (self._mean, self._variance, self._samples): if torch.is_tensor(t): return t.shape[:-2] raise NotImplementedError # pragma: no cover def _extended_shape( self, sample_shape: torch.Size = torch.Size(), # noqa: B008 ) -> torch.Size: r"""Return the extended shape of the posterior.""" return sample_shape + self.base_sample_shape @property def base_sample_shape(self) -> torch.Size: r"""Return the base sample shape of the posterior.""" if self._base_shape is not None: return self._base_shape if self._samples is not None: return self._samples.shape if self._mean is not None: return self._mean.shape if self._variance is not None: return self._variance.shape return torch.Size() @property def batch_range(self) -> tuple[int, int]: r"""Return the batch range of the posterior.""" return self._batch_range @property def mean(self): r"""Return the mean of the posterior.""" return self._mean @property def variance(self): r"""Return the variance of the posterior.""" return self._variance
[docs] def rsample( self, sample_shape: torch.Size | None = None, ) -> Tensor: """Return mock samples by extending the shape of the initially specified samples. Args: sample_shape: The shape of the samples to generate. Returns: A tensor of samples with the specified shape. """ if sample_shape is None: sample_shape = torch.Size() extended_shape = self._extended_shape(sample_shape) return self._samples.expand(extended_shape)
[docs] def rsample_from_base_samples( self, sample_shape: torch.Size, base_samples: Tensor, ) -> Tensor: if base_samples.shape[: len(sample_shape)] != sample_shape: raise RuntimeError( "`sample_shape` disagrees with shape of `base_samples`. " f"Got {sample_shape=} and {base_samples.shape=}." ) return self.rsample(sample_shape)
[docs] @GetSampler.register(MockPosterior) def get_sampler_mock( posterior: MockPosterior, sample_shape: torch.Size, **kwargs: Any ) -> MCSampler: """Get a ``StochasticSampler`` with the specified ``sample_shape``. Args: posterior: Used only for dispatching so that ``get_sampler`` works with a ``MockPosterior``. sample_shape: The shape of the samples to generate. kwargs: Passed to ``StochasticSampler`` Returns: A ``StochasticSampler`` for the mock posterior. """ return StochasticSampler(sample_shape=sample_shape, **kwargs)
[docs] class MockModel(Model, FantasizeMixin): """Mock ``Model`` that implements dummy methods and feeds through specified outputs. Its ``posterior`` is a ``MockPosterior``. """ def __init__(self, posterior: MockPosterior) -> None: # noqa: D107 r"""Initialize the MockModel with a specified posterior. Args: posterior: The mock posterior to use for the model. """ super(Model, self).__init__() self._posterior = posterior
[docs] def posterior( self, X: Tensor, output_indices: list[int] | None = None, posterior_transform: PosteriorTransform | None = None, observation_noise: bool | torch.Tensor = False, ) -> MockPosterior: r"""Return the posterior of the model. Args: X: Ignored; present for compatibility with super class. output_indices: Ignored; present for compatibility with super class. posterior_transform: Optional. observation_noise: Ignored; present for compatibility with super class. Returns: The posterior of the model, possibly transformed. """ if posterior_transform is not None: return posterior_transform(self._posterior) else: return self._posterior
@property def num_outputs(self) -> int: r"""Return the number of outputs of the model.""" extended_shape = self._posterior._extended_shape() return extended_shape[-1] if len(extended_shape) > 0 else 0 @property def batch_shape(self) -> torch.Size: r"""Return the batch shape of the model.""" extended_shape = self._posterior._extended_shape() return extended_shape[:-2]
[docs] def state_dict(self, *args, **kwargs) -> None: """Dummy method, has no effect""" pass
[docs] def load_state_dict( self, state_dict: OrderedDict | None = None, strict: bool = False ) -> None: """Dummy method, has no effect. Args: state_dict: The state dictionary to load. strict: Whether to strictly enforce that the keys in state_dict match the keys returned by this module's state_dict function. """ pass
[docs] class MockAcquisitionFunction: r"""Mock acquisition function object that implements dummy methods.""" def __init__(self): # noqa: D107 """ Initialize the MockAcquisitionFunction. This function does not really do anything, but it takes an input of shape (b,q,d) and returns a tensor of shape (b,). """ self.model = None self.X_pending = None self._call_args = {"__call__": [], "set_X_pending": []} def __call__(self, X): self._call_args["__call__"].append(X) return X[..., 0].max(dim=-1).values
[docs] def set_X_pending(self, X_pending: Tensor | None = None): self._call_args["set_X_pending"].append(X_pending) self.X_pending = X_pending
[docs] def get_random_data( batch_shape: torch.Size, m: int, d: int = 1, n: int = 10, **tkwargs ) -> tuple[Tensor, Tensor]: r"""Generate random data for testing purposes. Args: batch_shape: The batch shape of the data. m: The number of outputs. d: The dimension of the input. n: The number of data points. tkwargs: ``device`` and ``dtype`` tensor constructor kwargs. Returns: A tuple ``(train_X, train_Y)`` with randomly generated training data. """ rep_shape = batch_shape + torch.Size([1, 1]) train_x = torch.stack( [torch.linspace(0, 0.95, n, **tkwargs) for _ in range(d)], dim=-1 ) train_x = train_x + 0.05 * torch.rand_like(train_x).repeat(rep_shape) train_x[0] += 0.02 # modify the first batch train_y = torch.sin(train_x[..., :1] * (2 * math.pi)) train_y = train_y + 0.2 * torch.randn(n, m, **tkwargs).repeat(rep_shape) return train_x, train_y
[docs] def get_test_posterior( batch_shape: torch.Size, q: int = 1, m: int = 1, interleaved: bool = True, lazy: bool = False, independent: bool = False, **tkwargs, ) -> GPyTorchPosterior: r"""Generate a Posterior for testing purposes. Args: batch_shape: The batch shape of the data. q: The number of candidates m: The number of outputs. interleaved: A boolean indicating the format of the MultitaskMultivariateNormal lazy: A boolean indicating if the posterior should be lazy independent: A boolean indicating whether the outputs are independent tkwargs: ``device`` and ``dtype`` tensor constructor kwargs. """ if independent: mvns = [] for _ in range(m): mean = torch.rand(*batch_shape, q, **tkwargs) a = torch.rand(*batch_shape, q, q, **tkwargs) covar = a @ a.transpose(-1, -2) flat_diag = torch.rand(*batch_shape, q, **tkwargs) covar = covar + torch.diag_embed(flat_diag) mvns.append(MultivariateNormal(mean, covar)) mtmvn = MultitaskMultivariateNormal.from_independent_mvns(mvns) else: mean = torch.rand(*batch_shape, q, m, **tkwargs) a = torch.rand(*batch_shape, q * m, q * m, **tkwargs) covar = a @ a.transpose(-1, -2) flat_diag = torch.rand(*batch_shape, q * m, **tkwargs) if lazy: covar = AddedDiagLinearOperator(covar, DiagLinearOperator(flat_diag)) else: covar = covar + torch.diag_embed(flat_diag) mtmvn = MultitaskMultivariateNormal(mean, covar, interleaved=interleaved) return GPyTorchPosterior(mtmvn)
[docs] def get_max_violation_of_bounds(samples: torch.Tensor, bounds: torch.Tensor) -> float: """ The maximum value by which samples lie outside bounds. A negative value indicates that all samples lie within bounds. Args: samples: An ``n x q x d`` - dimension tensor, as might be returned from ``sample_q_batches_from_polytope``. bounds: A ``2 x d`` tensor of lower and upper bounds for each column. """ n, q, d = samples.shape samples = samples.reshape((n * q, d)) lower = samples.min(0).values upper = samples.max(0).values lower_dist = (bounds[0, :] - lower).max().item() upper_dist = (upper - bounds[1, :]).max().item() return max(lower_dist, upper_dist)
[docs] def get_max_violation_of_constraints( samples: torch.Tensor, constraints: list[tuple[Tensor, Tensor, float]] | None, equality: bool, ) -> float: r""" Amount by which equality constraints are not obeyed. Args: samples: An ``n x q x d`` - dimension tensor, as might be returned from ``sample_q_batches_from_polytope``. constraints: A list of tuples (indices, coefficients, rhs), with each tuple encoding an inequality constraint of the form ``\sum_i (X[indices[i]] * coefficients[i]) = rhs``, or ``>=`` if ``equality`` is False. equality: Whether these are equality constraints (not inequality). """ n, q, d = samples.shape max_error = 0 if constraints is not None: for ind, coef, rhs in constraints: if ind.ndim == 1: constr = samples[:, :, ind] @ coef else: constr = samples[:, ind[:, 0], ind[:, 1]] @ coef if equality: error = (constr - rhs).abs().max() else: error = (rhs - constr).max() max_error = max(max_error, error) return max_error