|
58 | 58 | import math |
59 | 59 | from abc import ABC |
60 | 60 |
|
| 61 | +import numpy as np |
61 | 62 | import torch |
62 | 63 | from botorch.exceptions.errors import InputDataError |
63 | 64 | from botorch.test_functions.base import BaseTestProblem, ConstrainedBaseTestProblem |
64 | 65 | from botorch.test_functions.utils import round_nearest |
| 66 | +from scipy import interpolate as si |
65 | 67 | from torch import Tensor |
66 | 68 |
|
67 | 69 |
|
@@ -990,6 +992,211 @@ def _evaluate_true(self, X: Tensor) -> Tensor: |
990 | 992 | return (self.dim**2) / (2.0 * energy) |
991 | 993 |
|
992 | 994 |
|
| 995 | +class TrajectoryPlanning(SyntheticTestFunction): |
| 996 | + r"""Trajectory optimization benchmark for navigating through obstacle fields. |
| 997 | +
|
| 998 | + This test function optimizes a trajectory from a fixed start point to a goal |
| 999 | + point while minimizing the cost integrated over the trajectory. |
| 1000 | +
|
| 1001 | + Problem Setup: |
| 1002 | + - Domain: [0, 1]^dim hypercube (dim must be even) |
| 1003 | + - Start: (0.05, 0.05) |
| 1004 | + - Goal: (0.95, 0.95) |
| 1005 | + - Obstacles: Grid of axis-aligned squares centered at regular intervals |
| 1006 | +
|
| 1007 | + Parameterization: |
| 1008 | + Parameters specify perturbations to goal-directed steps. This biases |
| 1009 | + the search toward trajectories that make progress toward the goal |
| 1010 | + while allowing deviations to avoid obstacles. |
| 1011 | +
|
| 1012 | + Interpolation Modes: |
| 1013 | + - Cubic spline (use_smooth_interp=True, default): Smooth C2 trajectories |
| 1014 | + - Piecewise linear (use_smooth_interp=False): Straight-line segments |
| 1015 | +
|
| 1016 | + Cost Function: |
| 1017 | + The cost is computed by integrating the arc length of the trajectory |
| 1018 | + that passes through obstacles: |
| 1019 | +
|
| 1020 | + cost = integral over trajectory of (20 if in_obstacle else 0) ds |
| 1021 | +
|
| 1022 | + Example: |
| 1023 | + >>> problem = TrajectoryPlanning(dim=20) |
| 1024 | + >>> x = torch.rand(problem.dim) # Random trajectory parameters |
| 1025 | + >>> cost = problem(x) # Evaluate obstacle cost |
| 1026 | + """ |
| 1027 | + |
| 1028 | + _optimal_value = 0.0 |
| 1029 | + _check_grad_at_opt: bool = False |
| 1030 | + |
| 1031 | + def __init__( |
| 1032 | + self, |
| 1033 | + dim: int = 30, |
| 1034 | + use_smooth_interp: bool = True, |
| 1035 | + max_step_size: float | None = None, |
| 1036 | + negate: bool = False, |
| 1037 | + dtype: torch.dtype = torch.double, |
| 1038 | + ) -> None: |
| 1039 | + """Initialize the trajectory planning problem. |
| 1040 | +
|
| 1041 | + Args: |
| 1042 | + dim: Dimensionality of the optimization problem. Must be even since |
| 1043 | + each waypoint has 2 coordinates. |
| 1044 | + use_smooth_interp: If True, use cubic spline interpolation for |
| 1045 | + smooth trajectories. If False, use piecewise linear interpolation. |
| 1046 | + max_step_size: Maximum allowed step size. If None, automatically computed. |
| 1047 | + negate: If True, negate the cost (for maximization problems). |
| 1048 | + dtype: Data type for tensors. |
| 1049 | +
|
| 1050 | + Raises: |
| 1051 | + ValueError: If dim is not even. |
| 1052 | + """ |
| 1053 | + if dim % 2 != 0: |
| 1054 | + raise ValueError(f"dim must be even, got {dim}") |
| 1055 | + |
| 1056 | + self.dim = dim |
| 1057 | + self.num_waypoints = dim // 2 |
| 1058 | + self.use_smooth_interp = use_smooth_interp |
| 1059 | + self.start = torch.full((2,), 0.05, dtype=dtype) |
| 1060 | + self.goal = torch.full((2,), 0.95, dtype=dtype) |
| 1061 | + |
| 1062 | + straight_line_dist = torch.linalg.norm(self.goal - self.start).item() |
| 1063 | + self.max_step_size = ( |
| 1064 | + max_step_size or 1.5 * straight_line_dist / self.num_waypoints |
| 1065 | + ) |
| 1066 | + |
| 1067 | + # Create obstacle grid (6x6 grid excluding corners) |
| 1068 | + grid = torch.linspace(0, 1, 6) |
| 1069 | + centers = ( |
| 1070 | + torch.stack(torch.meshgrid(grid, grid, indexing="ij")).reshape(2, -1).T |
| 1071 | + ) |
| 1072 | + centers = centers[1:-1] # exclude (0,0) and (1,1) corners |
| 1073 | + self.obs_low, self.obs_high = centers - 0.05, centers + 0.05 |
| 1074 | + self.obs_cost = 20 |
| 1075 | + self._bounds = [(0.0, 1.0)] * self.dim |
| 1076 | + self.continuous_inds = list(range(self.dim)) |
| 1077 | + |
| 1078 | + super().__init__(negate=negate) |
| 1079 | + |
| 1080 | + def _is_in_obstacle(self, points: Tensor) -> Tensor: |
| 1081 | + """Check if points collide with any obstacle. |
| 1082 | +
|
| 1083 | + Args: |
| 1084 | + points: Tensor of shape (n_points, dim) containing positions to check. |
| 1085 | +
|
| 1086 | + Returns: |
| 1087 | + Boolean tensor of shape (n_points,) where True indicates collision. |
| 1088 | + """ |
| 1089 | + if points.dim() == 1: |
| 1090 | + points = points.unsqueeze(0) |
| 1091 | + obs_low = self.obs_low.to(points.device) |
| 1092 | + obs_high = self.obs_high.to(points.device) |
| 1093 | + # Shape: (n_points, n_obstacles, 2) -> check all obstacles at once |
| 1094 | + in_box = (points.unsqueeze(1) >= obs_low) & (points.unsqueeze(1) <= obs_high) |
| 1095 | + return in_box.all(dim=-1).any(dim=-1) |
| 1096 | + |
| 1097 | + def _build_waypoints(self, params: Tensor) -> Tensor: |
| 1098 | + """Build waypoints from optimization parameters. |
| 1099 | +
|
| 1100 | + Args: |
| 1101 | + params: Flat tensor of parameters with shape (num_waypoints * 2,). |
| 1102 | +
|
| 1103 | + Returns: |
| 1104 | + Tensor of shape (n_waypoints, 2) containing the full |
| 1105 | + trajectory waypoints including start and goal. The number of |
| 1106 | + waypoints may be less than num_waypoints + 2 if the goal is |
| 1107 | + reached early. |
| 1108 | + """ |
| 1109 | + device, dtype = params.device, params.dtype |
| 1110 | + start = self.start.to(device=device, dtype=dtype) |
| 1111 | + goal = self.goal.to(device=device, dtype=dtype) |
| 1112 | + |
| 1113 | + step_params = params.reshape((self.num_waypoints, 2)) |
| 1114 | + waypoints = [start] |
| 1115 | + current_pos = start.clone() |
| 1116 | + |
| 1117 | + for i in range(self.num_waypoints): |
| 1118 | + to_goal = goal - current_pos |
| 1119 | + dist = torch.linalg.norm(to_goal) |
| 1120 | + |
| 1121 | + # If we've reached the goal, terminate early |
| 1122 | + if dist <= 1e-6: |
| 1123 | + break |
| 1124 | + |
| 1125 | + perturbation = (step_params[i] - 0.5) * 2 * self.max_step_size |
| 1126 | + ideal_step = min(dist.item() / (self.num_waypoints - i), self.max_step_size) |
| 1127 | + step = to_goal / dist * ideal_step + perturbation |
| 1128 | + step_norm = torch.linalg.norm(step) |
| 1129 | + if step_norm > self.max_step_size: |
| 1130 | + step = step * (self.max_step_size / step_norm) |
| 1131 | + |
| 1132 | + current_pos = (current_pos + step).clamp(0, 1) |
| 1133 | + waypoints.append(current_pos.clone()) |
| 1134 | + |
| 1135 | + waypoints.append(goal) |
| 1136 | + return torch.stack(waypoints) |
| 1137 | + |
| 1138 | + def _interpolate_trajectory( |
| 1139 | + self, waypoints: Tensor, n_samples: int = 1000 |
| 1140 | + ) -> Tensor: |
| 1141 | + """Interpolate dense trajectory points between waypoints. |
| 1142 | +
|
| 1143 | + Args: |
| 1144 | + waypoints: Tensor of shape (n_waypoints, dim) containing waypoint positions. |
| 1145 | + n_samples: Number of points to sample along the interpolated trajectory. |
| 1146 | +
|
| 1147 | + Returns: |
| 1148 | + Tensor of shape (n_samples, dim) containing trajectory points. |
| 1149 | + """ |
| 1150 | + device, dtype = waypoints.device, waypoints.dtype |
| 1151 | + |
| 1152 | + if self.use_smooth_interp: |
| 1153 | + # Remove duplicate waypoints for spline stability |
| 1154 | + dists = torch.linalg.norm(waypoints[1:] - waypoints[:-1], dim=1) |
| 1155 | + unique_mask = torch.cat( |
| 1156 | + [torch.ones(1, dtype=torch.bool, device=device), dists > 1e-6] |
| 1157 | + ) |
| 1158 | + unique_waypoints = waypoints[unique_mask] |
| 1159 | + |
| 1160 | + if len(unique_waypoints) >= 4: |
| 1161 | + wp_np = unique_waypoints.cpu().numpy() |
| 1162 | + tck, _ = si.splprep(wp_np.T, k=3, s=0) |
| 1163 | + t = torch.linspace(0, 1, n_samples).numpy() |
| 1164 | + points_np = np.array(si.splev(t, tck)) |
| 1165 | + return torch.tensor(points_np, device=device, dtype=dtype).T.clamp(0, 1) |
| 1166 | + |
| 1167 | + # Linear interpolation (pure PyTorch) |
| 1168 | + n_seg = len(waypoints) - 1 |
| 1169 | + t = torch.linspace(0, 1, n_samples // n_seg, device=device, dtype=dtype)[ |
| 1170 | + :-1, None |
| 1171 | + ] |
| 1172 | + # segments shape: (n_seg, n_samples_per_seg, 2) |
| 1173 | + # Each segment[i] has interpolated points between waypoints[i] and [i+1] |
| 1174 | + segments = (1 - t) * waypoints[:-1, None, :] + t * waypoints[1:, None, :] |
| 1175 | + return segments.reshape(-1, 2) |
| 1176 | + |
| 1177 | + def _evaluate_trajectory(self, params: Tensor) -> Tensor: |
| 1178 | + """Evaluate trajectory cost by integrating distance through obstacles. |
| 1179 | +
|
| 1180 | + Args: |
| 1181 | + params: Flat tensor of trajectory parameters. |
| 1182 | +
|
| 1183 | + Returns: |
| 1184 | + Integrated obstacle cost scaled by self.obs_cost. |
| 1185 | + """ |
| 1186 | + waypoints = self._build_waypoints(params) |
| 1187 | + points = self._interpolate_trajectory(waypoints, n_samples=1000) |
| 1188 | + collisions = self._is_in_obstacle(points) |
| 1189 | + segment_lengths = torch.linalg.norm(points[1:] - points[:-1], dim=1) |
| 1190 | + collision_weights = 0.5 * (collisions[:-1].float() + collisions[1:].float()) |
| 1191 | + return torch.sum(segment_lengths * collision_weights) * self.obs_cost |
| 1192 | + |
| 1193 | + def _evaluate_true(self, X: Tensor) -> Tensor: |
| 1194 | + """Evaluate the objective function on a batch of inputs.""" |
| 1195 | + if X.ndim == 1: |
| 1196 | + X = X.unsqueeze(0) |
| 1197 | + return torch.stack([self._evaluate_trajectory(x) for x in X]) |
| 1198 | + |
| 1199 | + |
993 | 1200 | # ------------ Constrained synthetic test functions ----------- # |
994 | 1201 |
|
995 | 1202 |
|
|
0 commit comments