diff --git a/package/samplers/orthobo/LICENSE b/package/samplers/orthobo/LICENSE new file mode 100644 index 00000000..9548e154 --- /dev/null +++ b/package/samplers/orthobo/LICENSE @@ -0,0 +1,21 @@ +MIT License + +Copyright (c) 2026 Maxim Kirby. + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in all +copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +SOFTWARE. diff --git a/package/samplers/orthobo/README.md b/package/samplers/orthobo/README.md new file mode 100644 index 00000000..300a688e --- /dev/null +++ b/package/samplers/orthobo/README.md @@ -0,0 +1,101 @@ +--- +author: Maxim Kirby +title: 'ORTHOBO: Orthogonal Bayesian Hyperparameter Optimization' +description: A drop-in Optuna sampler that reduces acquisition estimation noise in marginal Bayesian optimization using orthogonal score-function control variates, improving candidate ranking stability and optimization performance. +tags: [sampler, bayesian-optimization, gaussian-process, variance-reduction, botorch] +optuna_versions: [4.8.0] +license: MIT License +--- + +## Abstract + +Standard Bayesian optimization handles uncertainty by taking random samples of model parameters, but this random sampling creates noise. This noise can cause the optimizer to accidentally rank bad configurations above good ones and steer the search in the wrong direction. This issue is especially noticeable in complex tasks with many variables or when computational limits restrict the number of samples you can take. + +This package implements OrthoBO, a framework introduced in [this paper from May 2026](https://arxiv.org/abs/2605.06454). OrthoBO uses an orthogonal correction to cancel out the noise caused by finite sampling. It keeps the estimates accurate while making the candidate rankings much more stable. The result is better optimization performance. + +OrthoBO is designed as a drop-in replacement for Optuna's default BoTorch sampler. It works with almost any standard Bayesian optimization problem and requires absolutely no changes to your objective function or search space. + +## APIs + +### `OrthoBoSampler` + +```python +OrthoBoSampler( + n_startup_trials: int = 10, + mc_budget: int = 64, + use_orthogonal_correction bool = True, + seed: int | None = None, +) +``` + +**Parameters:** + +- `n_startup_trials`: + Number of initial quasi-random (Sobol) trials before BO begins. Defaults to 10. +- `mc_budget`: + The number of GP models sampled. Higher values reduce variance but increase compute time. Defaults to 64. +- `use_orthogonal_correction`: + If True, applies the orthogonal score-function control variate for variance reduction (OrthoBO mode). If False, falls back to plain MC marginalisation over the hyperposterior (Naive Marginal BO mode). Defaults to True +- `seed`: + Random seed for the Sobol startup sampler. Set for reproducibility. Defaults to None. + +## Installation + +```bash +pip install torch botorch gpytorch optuna optuna-integration +``` + +## Example + +```python +import optuna +import optunahub + +def objective(trial: optuna.Trial) -> float: + x = trial.suggest_float("x", -5.0, 5.0) + y = trial.suggest_float("y", -5.0, 5.0) + return (x - 2) ** 2 + (y + 1) ** 2 + +OrthoBoSampler = optunahub.load_module("samplers/orthobo").OrthoBoSampler + +sampler = OrthoBoSampler(n_startup_trials=10, mc_budget=64, use_orthogonal_correction=True) +study = optuna.create_study(direction="minimize", sampler=sampler) +study.optimize(objective, n_trials=50) + +print(f"Best value: {study.best_value:.4f}") +print(f"Best params: {study.best_params}") +``` + +## Benchmark Results + +Mean best-so-far regret ± 1 std across 5 random seeds. OrthoBO (green) is +compared against Vanilla BO (standard single-GP qLogEI, blue) and Naive +Marginal BO (MC marginalisation without orthogonal correction, orange). + +### Hartmann-6 (6 dimensions) + +

+ Hartmann6 +

+ +### Levy-16 (16 dimensions) + +

+ Levy16 +

+ +OrthoBO's advantage is most pronounced on higher-dimensional problems where acquisition estimation noise has a larger effect on candidate rankings. On lower-dimensional problems (Hartmann-6) where the GP fits well with few observations, the methods converge to similar performance. + +## Reference + +```bibtex +@misc{schröder2026orthoboorthogonalbayesianhyperparameter, + title={{ORTHOBO}: Orthogonal Bayesian Hyperparameter Optimization}, + author={Maresa Schröder and Pascal Janetzky and Michael Klar and Stefan Feuerriegel}, + year={2026}, + eprint={2605.06454}, + archivePrefix={arXiv}, + primaryClass={cs.LG}, + url={https://arxiv.org/abs/2605.06454}, +} +``` diff --git a/package/samplers/orthobo/__init__.py b/package/samplers/orthobo/__init__.py new file mode 100644 index 00000000..29230bd2 --- /dev/null +++ b/package/samplers/orthobo/__init__.py @@ -0,0 +1,4 @@ +from .sampler import OrthoBoSampler + + +__all__ = ["OrthoBoSampler"] diff --git a/package/samplers/orthobo/example.py b/package/samplers/orthobo/example.py new file mode 100644 index 00000000..c83678f0 --- /dev/null +++ b/package/samplers/orthobo/example.py @@ -0,0 +1,28 @@ +from __future__ import annotations + +import optuna +import optunahub + + +optuna.logging.set_verbosity(optuna.logging.WARNING) + + +def hartmann6(trial: optuna.Trial) -> float: + """Hartmann-6 benchmark. Global minimum: -3.32237 at ~(0.20, 0.15, 0.48, 0.27, 0.31, 0.66).""" + from botorch.test_functions import Hartmann + import torch + + f = Hartmann(dim=6) + x = torch.tensor([trial.suggest_float(f"x{i}", 0.0, 1.0) for i in range(6)]) + return f(x.unsqueeze(0)).item() + + +if __name__ == "__main__": + OrthoBoSampler = optunahub.load_module("samplers/orthobo").OrthoBoSampler + + sampler = OrthoBoSampler(n_startup_trials=10, mc_budget=64) + study = optuna.create_study(direction="minimize", sampler=sampler) + study.optimize(hartmann6, n_trials=30) + + print(f"Best value : {study.best_value:.5f} (global min: -3.32237)") + print(f"Best params: {study.best_params}") diff --git a/package/samplers/orthobo/images/Hartmann6.png b/package/samplers/orthobo/images/Hartmann6.png new file mode 100644 index 00000000..87c169ca Binary files /dev/null and b/package/samplers/orthobo/images/Hartmann6.png differ diff --git a/package/samplers/orthobo/images/Levy16.png b/package/samplers/orthobo/images/Levy16.png new file mode 100644 index 00000000..9b23b600 Binary files /dev/null and b/package/samplers/orthobo/images/Levy16.png differ diff --git a/package/samplers/orthobo/requirements.txt b/package/samplers/orthobo/requirements.txt new file mode 100644 index 00000000..a5287330 --- /dev/null +++ b/package/samplers/orthobo/requirements.txt @@ -0,0 +1,5 @@ +torch>=2.0.0 +botorch>=0.9.0 +gpytorch>=1.11 +optuna>=4.0.0 +optuna-integration>=0.1.0 \ No newline at end of file diff --git a/package/samplers/orthobo/sampler.py b/package/samplers/orthobo/sampler.py new file mode 100644 index 00000000..ad677579 --- /dev/null +++ b/package/samplers/orthobo/sampler.py @@ -0,0 +1,315 @@ +from __future__ import annotations + +import copy +from dataclasses import dataclass +from typing import Any +from typing import List +from typing import Tuple + +from botorch.acquisition.acquisition import AcquisitionFunction +from botorch.acquisition.analytic import LogExpectedImprovement +from botorch.fit import fit_gpytorch_mll +from botorch.models import SingleTaskGP +from botorch.models.transforms.input import Normalize +from botorch.models.transforms.outcome import Standardize +from botorch.optim import optimize_acqf +from gpytorch.mlls import ExactMarginalLogLikelihood +from optuna_integration import BoTorchSampler +import torch +from torch import Tensor +from torch.distributions import MultivariateNormal + + +# --------------------------------------------------------------------------- +# Hyperparam extraction utils +# --------------------------------------------------------------------------- + + +@dataclass +class ThetaField: + path: str + shape: torch.Size + size: int + transform: str = "log" + + +@dataclass +class ThetaSpec: + fields: List[ThetaField] + + @property + def total_size(self) -> int: + return sum(f.size for f in self.fields) + + +_SUPPORTED_POSITIVE_PATHS = [ + "likelihood.noise", + "covar_module.outputscale", + "covar_module.variance", + "covar_module.lengthscale", +] + + +def _get_attr(obj: Any, path: str) -> Any: + for part in path.split("."): + obj = getattr(obj, part) + return obj + + +def _set_attr(obj: Any, path: str, value: Any) -> None: + parts = path.split(".") + for part in parts[:-1]: + obj = getattr(obj, part) + setattr(obj, parts[-1], value) + + +def _discover_theta_spec(model: Any) -> ThetaSpec: + fields = [] + for path in _SUPPORTED_POSITIVE_PATHS: + try: + val = _get_attr(model, path) + if torch.is_tensor(val): + fields.append(ThetaField(path=path, shape=val.shape, size=val.numel())) + except AttributeError: + continue + if not fields: + raise RuntimeError("No supported GP hyperparameters found.") + return ThetaSpec(fields=fields) + + +def _extract_log_theta(model: Any) -> Tuple[Tensor, ThetaSpec]: + spec = _discover_theta_spec(model) + pieces = [ + _get_attr(model, f.path).detach().reshape(-1).clamp_min(1e-12).log() for f in spec.fields + ] + return torch.cat(pieces), spec + + +def _set_theta_from_log(model: Any, u: Tensor, spec: ThetaSpec) -> None: + offset = 0 + with torch.no_grad(): + for field in spec.fields: + chunk = u[offset : offset + field.size].view(field.shape).exp() + _set_attr(model, field.path, chunk) + offset += field.size + model.prediction_strategy = None + + +# --------------------------------------------------------------------------- +# Hyperposterior (diagonal Laplace approximation) +# --------------------------------------------------------------------------- + + +@dataclass +class GaussianHyperPosterior: + mean: Tensor + cov: Tensor + + def sample(self, S: int) -> Tensor: + return MultivariateNormal(self.mean, covariance_matrix=self.cov).rsample((S,)) + + def score(self, u: Tensor) -> Tensor: + diff = u - self.mean + return -torch.linalg.solve(self.cov, diff.T).T + + +def _neg_log_post(u: Tensor, model_template: Any, spec: ThetaSpec) -> Tensor: + model = copy.deepcopy(model_template) + mll = ExactMarginalLogLikelihood(model.likelihood, model) + model.train() + model.likelihood.train() + _set_theta_from_log(model, u, spec) + output = model(model.train_inputs[0]) + return -mll(output, model.train_targets) + + +def _build_hyperposterior( + fitted_model: Any, + hess_jitter: float = 1e-4, + max_log_std: float = 0.10, +) -> Tuple[GaussianHyperPosterior, ThetaSpec]: + u_hat, spec = _extract_log_theta(fitted_model) + u_hat = u_hat.detach().clone().requires_grad_(True) + + H = torch.autograd.functional.hessian( + lambda u: _neg_log_post(u, fitted_model, spec), u_hat + ).detach() + H = 0.5 * (H + H.T) + H_diag = torch.diag(H).clamp_min(hess_jitter) + std = torch.clamp(1.0 / torch.sqrt(H_diag), max=max_log_std) + + posterior = GaussianHyperPosterior( + mean=u_hat.detach(), + cov=torch.diag(std.pow(2)).detach(), + ) + return posterior, spec + + +def _sample_valid_thetas( + model: Any, + hyperposterior: GaussianHyperPosterior, + spec: ThetaSpec, + S: int, + train_x: Tensor, + max_tries: int = 1000, +) -> Tuple[Tensor, List[Any]]: + accepted_models: list[Any] = [] + accepted_thetas: list[Tensor] = [] + thetas = hyperposterior.sample(2 * S) + idx = tries = 0 + + while len(accepted_models) < S and tries < max_tries: + theta = thetas[idx] + tries += 1 + try: + m = copy.deepcopy(model) + m.prediction_strategy = None + _set_theta_from_log(m, theta, spec) + m.posterior(train_x[:1]) # validate + accepted_models.append(m) + accepted_thetas.append(theta) + except Exception: + pass + idx += 1 + + if len(accepted_models) < S: + raise RuntimeError( + f"Could not obtain {S} valid theta samples after {max_tries} tries. " + "Consider increasing hess_jitter or reducing mc_budget." + ) + return torch.stack(accepted_thetas, dim=0), accepted_models + + +# --------------------------------------------------------------------------- +# Orthogonal acquisition function +# --------------------------------------------------------------------------- + + +class _OrthogonalLogEI(AcquisitionFunction): + def __init__( + self, + model: Any, + best_f: Tensor, + hyperposterior: GaussianHyperPosterior, + theta_samples: Tensor, + theta_models: List[Any], + use_orthogonal_correction: bool = True, + eps: float = 1e-12, + cov_jitter: float = 1e-6, + ) -> None: + super().__init__(model=model) + self.eps = eps + self.cov_jitter = cov_jitter + self.use_orthogonal_correction = use_orthogonal_correction + + g = hyperposterior.score(theta_samples) + self.g = g + self.g_centered = g - g.mean(dim=0, keepdim=True) + self.g_cov = torch.cov(g.T) + + self.theta_acqfns = [LogExpectedImprovement(model=m, best_f=best_f) for m in theta_models] + + def forward(self, X: Tensor) -> Tensor: + h = torch.stack([torch.exp(fn(X)) for fn in self.theta_acqfns], dim=0) + + if not self.use_orthogonal_correction: + return torch.log(torch.clamp_min(h.mean(dim=0), self.eps)) + + S = self.g.shape[0] + h_centered = h - h.mean(dim=0, keepdim=True) + cov_gg = self.g_cov + self.cov_jitter * torch.eye( + self.g_cov.shape[0], dtype=self.g_cov.dtype, device=self.g_cov.device + ) + cov_gh = (self.g_centered.T @ h_centered) / max(S - 1, 1) + gamma = torch.linalg.solve(cov_gg, cov_gh) + orth = h - (self.g @ gamma) + return torch.log(torch.clamp_min(orth.mean(dim=0), self.eps)) + + +# --------------------------------------------------------------------------- +# Sampler +# --------------------------------------------------------------------------- + + +class OrthoBoSampler(BoTorchSampler): + """ + Orthogonalized Bayesian Optimization sampler for Optuna. + + Args: + n_startup_trials: + Number of initial quasi-random (Sobol) trials before BO begins. + Defaults to 10. + mc_budget: + Number of GP models sampled from the hyperposterior (S in paper). + Higher values reduce variance but increase compute. Defaults to 64. + use_orthogonal_correction: + If True (default), applies the orthogonal score-function control + variate for variance reduction (OrthoBO mode). + If False, falls back to plain MC marginalisation over the + hyperposterior (Naive Marginal BO mode). + seed: + Random seed for the Sobol startup sampler. Set for reproducibility. + Defaults to None. + """ + + def __init__( + self, + n_startup_trials: int = 10, + mc_budget: int = 64, + use_orthogonal_correction: bool = True, + seed: int | None = None, + ) -> None: + super().__init__( + candidates_func=self._get_candidates, + n_startup_trials=n_startup_trials, + seed=seed, + ) + self.mc_budget = mc_budget + self.use_orthogonal_correction = use_orthogonal_correction + + def _get_candidates( + self, + train_x: Tensor, + train_obj: Tensor, + train_con: Tensor | None, + bounds: Tensor, + pending_x: Tensor | None, + ) -> Tensor: + dim = train_x.shape[-1] + + model = SingleTaskGP( + train_x, + train_obj, + input_transform=Normalize(d=dim, bounds=bounds), + outcome_transform=Standardize(m=1), + ) + mll = ExactMarginalLogLikelihood(model.likelihood, model) + fit_gpytorch_mll(mll) + + hyperposterior, spec = _build_hyperposterior(model) + theta_samples, theta_models = _sample_valid_thetas( + model=model, + hyperposterior=hyperposterior, + spec=spec, + S=self.mc_budget, + train_x=train_x, + ) + + acqf = _OrthogonalLogEI( + model=model, + best_f=train_obj.max(), + hyperposterior=hyperposterior, + theta_samples=theta_samples, + theta_models=theta_models, + use_orthogonal_correction=self.use_orthogonal_correction, + ) + + candidate, _ = optimize_acqf( + acq_function=acqf, + bounds=bounds, + q=1, + num_restarts=10, + raw_samples=256, + ) + + return candidate.clamp(bounds[0], bounds[1])