Source code for botorch.generation.gen

#!/usr/bin/env python3
# Copyright (c) Meta Platforms, Inc. and affiliates.
#
# This source code is licensed under the MIT license found in the
# LICENSE file in the root directory of this source tree.

r"""
Candidate generation utilities.
"""

from __future__ import annotations

import time
import warnings
from collections.abc import Callable, Mapping
from functools import partial
from typing import Any, NoReturn

import numpy as np
import numpy.typing as npt
import torch
from botorch.acquisition import AcquisitionFunction
from botorch.exceptions.errors import OptimizationGradientError, UnsupportedError
from botorch.exceptions.warnings import OptimizationWarning
from botorch.generation.utils import _remove_fixed_features_from_optimization
from botorch.logging import logger
from botorch.optim.parameter_constraints import (
    _arrayify,
    make_scipy_bounds,
    make_scipy_linear_constraints,
    make_scipy_nonlinear_inequality_constraints,
    project_to_equality_constraints,
)
from botorch.optim.stopping import ExpMAStoppingCriterion
from botorch.optim.utils import (
    check_scipy_version_at_least,
    columnwise_clamp,
    fix_features,
    minimize_with_timeout,
)
from scipy.optimize import OptimizeResult
from threadpoolctl import threadpool_limits
from torch import Tensor
from torch.optim import Optimizer

SCIPY_MIN_VERSION: int = 13
SCIPY_UNTESTED_VERSION: int = 18

if check_scipy_version_at_least(
    minor=SCIPY_MIN_VERSION
) and not check_scipy_version_at_least(minor=SCIPY_UNTESTED_VERSION):
    # We only import the batched lbfgs_b code here, as it might otherwise
    # lead to import errors, if the wrong scipy version is used
    from botorch.optim.batched_lbfgs_b import (
        fmin_l_bfgs_b_batched,
        translate_bounds_for_lbfgsb,
    )

TGenCandidates = Callable[[Tensor, AcquisitionFunction, Any], tuple[Tensor, Tensor]]


[docs] def gen_candidates_scipy( initial_conditions: Tensor, acquisition_function: AcquisitionFunction, lower_bounds: float | Tensor | None = None, upper_bounds: float | Tensor | None = None, inequality_constraints: list[tuple[Tensor, Tensor, float]] | None = None, equality_constraints: list[tuple[Tensor, Tensor, float]] | None = None, nonlinear_inequality_constraints: list[tuple[Callable, bool]] | None = None, options: dict[str, Any] | None = None, fixed_features: Mapping[int, float | Tensor] | None = None, timeout_sec: float | None = None, use_parallel_mode: bool | None = None, ) -> tuple[Tensor, Tensor]: r"""Generate a set of candidates using ``scipy.optimize.minimize``. Optimizes an acquisition function starting from a set of initial candidates using ``scipy.optimize.minimize`` via a numpy converter. We use SLSQP, if constraints are present, and LBFGS-B otherwise. As ``scipy.optimize.minimize`` does not support optimizating a batch of problems, we treat optimizing a set of candidates as a single optimization problem by summing together their acquisition values. Args: initial_conditions: Starting points for optimization, with shape (b) x q x d. acquisition_function: Acquisition function to be used. lower_bounds: Minimum values for each column of initial_conditions. upper_bounds: Maximum values for each column of initial_conditions. inequality constraints: A list of tuples (indices, coefficients, rhs), with each tuple encoding an inequality constraint of the form ``\sum_i (X[indices[i]] * coefficients[i]) >= rhs``. equality constraints: A list of tuples (indices, coefficients, rhs), with each tuple encoding an inequality constraint of the form ``\sum_i (X[indices[i]] * coefficients[i]) = rhs``. nonlinear_inequality_constraints: A list of tuples representing the nonlinear inequality constraints. The first element in the tuple is a callable representing a constraint of the form ``callable(x) >= 0``. In case of an intra-point constraint, ``callable()``takes in an one-dimensional tensor of shape ``d`` and returns a scalar. In case of an inter-point constraint, ``callable()`` takes a two dimensional tensor of shape ``q x d`` and again returns a scalar. The second element is a boolean, indicating if it is an intra-point or inter-point constraint (``True`` for intra-point. ``False`` for inter-point). For more information on intra-point vs inter-point constraints, see the docstring of the ``inequality_constraints`` argument to ``optimize_acqf()``. The constraints will later be passed to the scipy solver. options: Options used to control the optimization including "method" and "maxiter". Select method for ``scipy.optimize.minimize`` using the "method" key. By default uses L-BFGS-B for box-constrained problems and SLSQP if inequality or equality constraints are present. If ``with_grad=False``, then we use a two-point finite difference estimate of the gradient. fixed_features: Mapping[int, float | Tensor] | None, all generated candidates will have features fixed to these values. If passing tensors as values, they should have either shape ``b`` or ``b x q`` to fix the same feature to different values in the batch. Assumes values to be compatible with lower_bounds and upper_bounds! timeout_sec: Timeout (in seconds) for ``scipy.optimize.minimize`` routine - if provided, optimization will stop after this many seconds and return the best solution found so far. use_parallel_mode: If None uses the parallel implementation of l-bfgs-b, if possible. If True, forces the use of the parallel implementation and fails if not possible. If using parallel mode, each item in the batch dimension is treated as a separate optimization problem, we enforce the shape of the initial conditions to be ``b x q x d`` or ``q x d``, and we assume the ``acquisition_function`` does not treat elements differently in the batch dimension (it is simply a batched function). If False, forces the use of the serial implementation through ``scipy.optimize.minimize``. Returns: 2-element tuple containing - The set of generated candidates. - The acquisition value for each t-batch. Example: >>> qEI = qExpectedImprovement(model, best_f=0.2) >>> bounds = torch.tensor([[0., 0.], [1., 2.]]) >>> Xinit = gen_batch_initial_conditions( >>> qEI, bounds, q=3, num_restarts=25, raw_samples=500 >>> ) >>> batch_candidates, batch_acq_values = gen_candidates_scipy( initial_conditions=Xinit, acquisition_function=qEI, lower_bounds=bounds[0], upper_bounds=bounds[1], ) """ if use_parallel_mode: if initial_conditions.ndim not in (2, 3): raise UnsupportedError( "Initial conditions must have either 2 or 3 dimensions. " f"Received shape {initial_conditions.shape}" ) options = options or {} options = {**options, "maxiter": options.get("maxiter", 2000)} original_initial_conditions_shape = initial_conditions.shape if initial_conditions.ndim == 2: initial_conditions = initial_conditions.unsqueeze(0) initial_conditions_all_features = initial_conditions if fixed_features: initial_conditions = initial_conditions[ ..., [i for i in range(initial_conditions.shape[-1]) if i not in fixed_features], ] if isinstance(lower_bounds, Tensor): lower_bounds = lower_bounds[ [i for i in range(len(lower_bounds)) if i not in fixed_features] ] if isinstance(upper_bounds, Tensor): upper_bounds = upper_bounds[ [i for i in range(len(upper_bounds)) if i not in fixed_features] ] clamped_candidates = columnwise_clamp( X=initial_conditions, lower=lower_bounds, upper=upper_bounds, raise_on_violation=True, ) def f(x): return -acquisition_function(x) is_constrained = ( nonlinear_inequality_constraints or equality_constraints or inequality_constraints ) method = options.get("method", "SLSQP" if is_constrained else "L-BFGS-B") with_grad = options.get("with_grad", True) minimize_options = { k: v for k, v in options.items() if k not in [ "method", "callback", "with_grad", "max_optimization_problem_aggregation_size", ] } why_not_fast_path = get_reasons_against_fast_path( method=method, with_grad=with_grad, minimize_options=minimize_options, timeout_sec=timeout_sec, ) f_np_wrapper = _get_f_np_wrapper( clamped_candidates.shape, initial_conditions.device, initial_conditions.dtype, with_grad, ) if not why_not_fast_path and use_parallel_mode is not False: if is_constrained: raise RuntimeWarning("Method L-BFGS-B cannot handle constraints.") batched_x0 = _arrayify(clamped_candidates).reshape(len(clamped_candidates), -1) l_bfgs_b_bounds = translate_bounds_for_lbfgsb( lower_bounds=lower_bounds, upper_bounds=upper_bounds, num_features=clamped_candidates.shape[-1], q=clamped_candidates.shape[1], ) with threadpool_limits(limits=1, user_api="blas"): xs, fs, results = fmin_l_bfgs_b_batched( func=partial(f_np_wrapper, f=f, fixed_features=fixed_features), # args is not necessary, done via the partial instead x0=batched_x0, # method=method, # method is not necessary as it is only l-bfgs-b # jac=with_grad, this is assumed to be true bounds=l_bfgs_b_bounds, # constraints=constraints, callback=options.get("callback", None), pass_batch_indices=True, **minimize_options, ) for res in results: _process_scipy_result(res=res, options=options) else: # In this case we optimize multiple initial conditions in a single # problem, up to max_optimization_problem_aggregation_size at a time. max_optimization_problem_aggregation_size = options.get( "max_optimization_problem_aggregation_size", len(clamped_candidates) ) if use_parallel_mode is not False: msg = ( "Not using the parallel implementation of l-bfgs-b, as: " + ", and ".join(why_not_fast_path) ) if use_parallel_mode: raise NotImplementedError(msg) else: logger.debug(msg) if ( fixed_features and any( torch.is_tensor(ff) and ff.ndim > 0 for ff in fixed_features.values() ) and max_optimization_problem_aggregation_size != 1 ): raise UnsupportedError( "Batch shaped fixed features are not " "supported, when optimizing more than one optimization " "problem at a time." ) all_xs = [] split_candidates = clamped_candidates.split( max_optimization_problem_aggregation_size ) for i, candidates_ in enumerate(split_candidates): if fixed_features: fixed_features_ = { k: ( ff[i : i + 1].item() # from the test above, we know that we only treat one candidate # at a time thus we can use index i if torch.is_tensor(ff) and ff.ndim > 0 else ff ) for k, ff in fixed_features.items() } else: fixed_features_ = None _no_fixed_features = _remove_fixed_features_from_optimization( fixed_features=fixed_features_, acquisition_function=acquisition_function, initial_conditions=None, d=initial_conditions_all_features.shape[-1], lower_bounds=None, upper_bounds=None, inequality_constraints=inequality_constraints, equality_constraints=equality_constraints, nonlinear_inequality_constraints=nonlinear_inequality_constraints, ) bounds = make_scipy_bounds( X=candidates_, lower_bounds=lower_bounds, upper_bounds=upper_bounds, ) f_np_wrapper_ = partial( f_np_wrapper, fixed_features=fixed_features_, ) # Project initial conditions onto the equality constraint # manifold so they satisfy constraints to numerical precision. # MCMC-sampled initial conditions (from HitAndRunPolytopeSampler) # may only approximately satisfy equality constraints. unfixed_eq = _no_fixed_features.equality_constraints if unfixed_eq: candidates_ = project_to_equality_constraints( X=candidates_, equality_constraints=unfixed_eq ) # Re-clamp after projection to stay within bounds. candidates_ = columnwise_clamp( X=candidates_, lower=lower_bounds, upper=upper_bounds, raise_on_violation=False, ) x0 = candidates_.flatten() constraints = make_scipy_linear_constraints( shapeX=candidates_.shape, inequality_constraints=_no_fixed_features.inequality_constraints, equality_constraints=_no_fixed_features.equality_constraints, ) if _no_fixed_features.nonlinear_inequality_constraints: # Make sure ``batch_limit`` is 1 for now. if not (len(candidates_.shape) == 3 and candidates_.shape[0] == 1): raise ValueError( "`batch_limit` must be 1 when non-linear inequality " "constraints are given." ) nl_ineq_constraints = ( _no_fixed_features.nonlinear_inequality_constraints ) constraints += make_scipy_nonlinear_inequality_constraints( nonlinear_inequality_constraints=nl_ineq_constraints, f_np_wrapper=f_np_wrapper_, x0=x0, shapeX=candidates_.shape, ) x0 = _arrayify(x0) res = minimize_with_timeout( fun=f_np_wrapper_, args=(f,), x0=x0, method=method, jac=with_grad, bounds=bounds, constraints=constraints, callback=options.get("callback", None), options=minimize_options, timeout_sec=( timeout_sec / len(split_candidates) if timeout_sec is not None else None ), ) _process_scipy_result(res=res, options=options) xs = res.x.reshape(candidates_.shape) all_xs.append(xs) xs = np.concatenate(all_xs) candidates = torch.from_numpy(xs).view_as(clamped_candidates).to(initial_conditions) clamped_candidates = columnwise_clamp( X=candidates, lower=lower_bounds, upper=upper_bounds, raise_on_violation=True ) clamped_candidates = fix_features( X=clamped_candidates, fixed_features=fixed_features, replace_current_value=False, ) clamped_candidates = clamped_candidates.reshape(original_initial_conditions_shape) with torch.no_grad(): batch_acquisition = acquisition_function(clamped_candidates) return clamped_candidates, batch_acquisition
def _get_f_np_wrapper(shapeX, device, dtype, with_grad): if with_grad: def f_np_wrapper( x: npt.NDArray, f: Callable, fixed_features: Mapping[int, float | Tensor] | None, batch_indices: list[int] | None = None, ) -> tuple[float | np.NDArray, np.NDArray]: """Given a torch callable, compute value + grad given a numpy array.""" if np.isnan(x).any(): raise RuntimeError( f"{np.isnan(x).sum()} elements of the {x.size} element array " f"`x` are NaN." ) X = ( torch.from_numpy(x) .to(device=device, dtype=dtype) # We reshape in this way, as in parallel mode the batch dimension might # change during optimization: some examples might finish earlier than # others. .view(-1, *shapeX[1:]) .contiguous() .requires_grad_(True) ) if fixed_features is not None: if batch_indices is not None: this_fixed_features = { k: ( ff[batch_indices] if torch.is_tensor(ff) and ff.ndim > 0 else ff ) for k, ff in fixed_features.items() } else: this_fixed_features = fixed_features else: this_fixed_features = None X_fix = fix_features( X, fixed_features=this_fixed_features, replace_current_value=False ) # we compute the loss on the whole batch, under the assumption that f # treats multiple inputs in the 0th dimension as independent # inputs in a batch losses = f(X_fix) loss = losses.sum() # compute gradient w.r.t. the inputs (does not accumulate in leaves) gradf = _arrayify(torch.autograd.grad(loss, X)[0].contiguous().view(-1)) gradf = gradf.reshape(*x.shape) if np.isnan(gradf).any(): msg = ( f"{np.isnan(gradf).sum()} elements of the {x.size} element " "gradient array `gradf` are NaN. " "This often indicates numerical issues." ) if x.dtype != torch.double: msg += " Consider using `dtype=torch.double`." raise OptimizationGradientError(msg, current_x=x) fval = ( losses.detach().view(-1).cpu().numpy() if batch_indices is not None else loss.detach().item() ) # the view(-1) seems necessary as f might return a single scalar return fval, gradf else: # This function (that is used if no grads are avail) can also be batched, # we just did not batch it so far as the priority is not as high. def f_np_wrapper( x: npt.NDArray, f: Callable, fixed_features: dict[int, float] | None ): X = ( torch.from_numpy(x) .to(device=device, dtype=dtype) .view(-1, *shapeX[1:]) .contiguous() ) with torch.no_grad(): X_fix = fix_features( X=X, fixed_features=fixed_features, replace_current_value=False ) loss = f(X_fix).sum() fval = loss.detach().item() return fval return f_np_wrapper
[docs] def get_reasons_against_fast_path( method: str, with_grad: bool, minimize_options: dict[str, Any], timeout_sec: float | None, ) -> list[str]: # this if-statement is a homage to pytorch's nn.MultiheadAttentionby by @swolchok why_not_fast_path = [] if not method == "L-BFGS-B": why_not_fast_path.append(f"method={method}, method needs to be L-BFGS-B") if not with_grad: why_not_fast_path.append("with_grad=False, it needs to be True") if extra_keys := set(minimize_options.keys()) - { "maxiter", "disp", "iprint", "max_cor", "ftol", "pgtol", "factr", "tol", "maxls", }: why_not_fast_path.append(f"options={extra_keys} are not accepted") if timeout_sec is not None: why_not_fast_path.append(f"timeout_sec={timeout_sec}, it needs to be None") if not check_scipy_version_at_least( minor=SCIPY_MIN_VERSION ) or check_scipy_version_at_least(minor=SCIPY_UNTESTED_VERSION): # pragma: no cover # In SciPy 1.15.0, the fortran implementation of L-BFGS-B was # translated to C changing its interface slightly. # Additionally, we don't know what the future might hold in scipy, # thus we use this function to use less optimized code for too new # scipy versions. why_not_fast_path.append( f"Scipy version is not in the range from 1.{SCIPY_MIN_VERSION}.0 to " f"1.{SCIPY_UNTESTED_VERSION}.x." ) return why_not_fast_path
[docs] def gen_candidates_torch( initial_conditions: Tensor, acquisition_function: AcquisitionFunction, lower_bounds: float | Tensor | None = None, upper_bounds: float | Tensor | None = None, optimizer: type[Optimizer] = torch.optim.Adam, options: dict[str, float | str] | None = None, callback: Callable[[int, Tensor, Tensor], NoReturn] | None = None, fixed_features: Mapping[int, float | Tensor] | None = None, timeout_sec: float | None = None, ) -> tuple[Tensor, Tensor]: r"""Generate a set of candidates using a ``torch.optim`` optimizer. Optimizes an acquisition function starting from a set of initial candidates using an optimizer from ``torch.optim``. Args: initial_conditions: Starting points for optimization. acquisition_function: Acquisition function to be used. lower_bounds: Minimum values for each column of initial_conditions. upper_bounds: Maximum values for each column of initial_conditions. optimizer (Optimizer): The pytorch optimizer to use to perform candidate search. options: Options used to control the optimization. Includes - optimizer_options: Dict of additional options to pass to the optimizer (e.g. lr, weight_decay) - stopping_criterion_options: Dict of options for the stopping criterion. callback: A callback function accepting the current iteration, loss, and gradients as arguments. This function is executed after computing the loss and gradients, but before calling the optimizer. fixed_features: This is a dictionary of feature indices to values, where all generated candidates will have features fixed to these values. If a float is passed it is fixed across [b,q], if a tensor is passed: it might either be of shape [b,q] or [b], in which case the same value is used across the q dimension. Assumes values to be compatible with lower_bounds and upper_bounds! timeout_sec: Timeout (in seconds) for optimization. If provided, ``gen_candidates_torch`` will stop after this many seconds and return the best solution found so far. Returns: 2-element tuple containing - The set of generated candidates. - The acquisition value for each t-batch. Example: >>> qEI = qExpectedImprovement(model, best_f=0.2) >>> bounds = torch.tensor([[0., 0.], [1., 2.]]) >>> Xinit = gen_batch_initial_conditions( >>> qEI, bounds, q=3, num_restarts=25, raw_samples=500 >>> ) >>> batch_candidates, batch_acq_values = gen_candidates_torch( initial_conditions=Xinit, acquisition_function=qEI, lower_bounds=bounds[0], upper_bounds=bounds[1], ) """ start_time = time.monotonic() options = options or {} # We remove max_optimization_problem_aggregation_size as it does not affect # the 1st order optimizers implemented in this method. # Here, it does not matter whether one combines multiple optimizations into # one or not. _clamp = partial(columnwise_clamp, lower=lower_bounds, upper=upper_bounds) clamped_candidates = _clamp(initial_conditions) if fixed_features: clamped_candidates = clamped_candidates[ ..., [i for i in range(clamped_candidates.shape[-1]) if i not in fixed_features], ] clamped_candidates = clamped_candidates.requires_grad_(True) # Extract optimizer-specific options from the options dict optimizer_options = options.get("optimizer_options", {}).copy() stopping_criterion_options = options.get("stopping_criterion_options", {}).copy() # Backward compatibility: if old 'maxiter' parameter is passed, move it to # stopping_criterion_options with a deprecation warning if "maxiter" in options: warnings.warn( "Passing 'maxiter' directly in options is deprecated. " "Please use options['stopping_criterion_options']['maxiter'] instead.", DeprecationWarning, stacklevel=2, ) # For backward compatibility, pass to stopping_criterion_options if "maxiter" not in stopping_criterion_options: stopping_criterion_options["maxiter"] = options["maxiter"] optimizer_options.setdefault("lr", 0.025) _optimizer = optimizer(params=[clamped_candidates], **optimizer_options) i = 0 stop = False stopping_criterion = ExpMAStoppingCriterion(**stopping_criterion_options) while not stop: i += 1 with torch.no_grad(): X = _clamp(clamped_candidates).requires_grad_(True) loss = -acquisition_function(fix_features(X, fixed_features)).sum() grad = torch.autograd.grad(loss, X)[0] if callback: callback(i, loss, grad) def assign_grad(): _optimizer.zero_grad() clamped_candidates.grad = grad return loss _optimizer.step(assign_grad) stop = stopping_criterion(fvals=loss.detach()) if timeout_sec is not None: runtime = time.monotonic() - start_time if runtime > timeout_sec: stop = True logger.info(f"Optimization timed out after {runtime} seconds.") clamped_candidates = _clamp(clamped_candidates) clamped_candidates = fix_features(clamped_candidates, fixed_features) with torch.no_grad(): batch_acquisition = acquisition_function(clamped_candidates) return clamped_candidates, batch_acquisition
[docs] def get_best_candidates(batch_candidates: Tensor, batch_values: Tensor) -> Tensor: r"""Extract best (q-batch) candidate from batch of candidates Args: batch_candidates: A ``b x q x d`` tensor of ``b`` q-batch candidates, or a ``b x d`` tensor of ``b`` single-point candidates. batch_values: A tensor with ``b`` elements containing the value of the respective candidate (higher is better). Returns: A tensor of size ``q x d`` (if q-batch mode) or ``d`` from batch_candidates with the highest associated value. Example: >>> qEI = qExpectedImprovement(model, best_f=0.2) >>> bounds = torch.tensor([[0., 0.], [1., 2.]]) >>> Xinit = gen_batch_initial_conditions( >>> qEI, bounds, q=3, num_restarts=25, raw_samples=500 >>> ) >>> batch_candidates, batch_acq_values = gen_candidates_scipy( initial_conditions=Xinit, acquisition_function=qEI, lower_bounds=bounds[0], upper_bounds=bounds[1], ) >>> best_candidates = get_best_candidates(batch_candidates, batch_acq_values) """ best = torch.argmax(batch_values.view(-1), dim=0) return batch_candidates[best]
def _process_scipy_result(res: OptimizeResult, options: dict[str, Any]) -> None: r"""Process scipy optimization result to produce relevant logs and warnings.""" if "success" not in res.keys() or "status" not in res.keys(): with warnings.catch_warnings(): warnings.simplefilter("always", category=OptimizationWarning) warnings.warn( "Optimization failed within `scipy.optimize.minimize` with no " "status returned to `res.`", OptimizationWarning, stacklevel=3, ) elif not res.success: if ( "ITERATIONS REACHED LIMIT" in res.message or "Iteration limit reached" in res.message ): logger.info( "`scipy.optimize.minimize` exited by reaching the iteration limit of " f"`maxiter: {options.get('maxiter')}`." ) elif "EVALUATIONS EXCEEDS LIMIT" in res.message: logger.info( "`scipy.optimize.minimize` exited by reaching the function evaluation " f"limit of `maxfun: {options.get('maxfun')}`." ) elif "Optimization timed out after" in res.message: logger.info(res.message) else: with warnings.catch_warnings(): warnings.simplefilter("always", category=OptimizationWarning) warnings.warn( f"Optimization failed within `scipy.optimize.minimize` with status " f"{res.status} and message {res.message}.", OptimizationWarning, stacklevel=3, )