|
| 1 | +""" |
| 2 | +unscented_kalman_filter_localizer.py |
| 3 | +
|
| 4 | +Author: Bruno DOKPOMIWA |
| 5 | +""" |
| 6 | + |
| 7 | +import sys |
| 8 | +import numpy as np |
| 9 | +import scipy.linalg as spl |
| 10 | +from pathlib import Path |
| 11 | +from math import cos, sin, sqrt, atan2, pi |
| 12 | + |
| 13 | +sys.path.append(str(Path(__file__).absolute().parent) + "/../../state") |
| 14 | +sys.path.append(str(Path(__file__).absolute().parent) + "/../../array") |
| 15 | +sys.path.append(str(Path(__file__).absolute().parent) + "/../../sensors/gnss") |
| 16 | +from state import State |
| 17 | +from xy_array import XYArray |
| 18 | + |
| 19 | + |
| 20 | +class UnscentedKalmanFilterLocalizer: |
| 21 | + """ |
| 22 | + Self localization by Unscented Kalman Filter class |
| 23 | + """ |
| 24 | + |
| 25 | + def __init__(self, accel_noise=0.2, yaw_rate_noise=10.0, |
| 26 | + obsrv_x_noise=1.0, obsrv_y_noise=1.0, |
| 27 | + alpha=0.001, beta=2, kappa=0, color='r'): |
| 28 | + """ |
| 29 | + Constructor |
| 30 | + accel_noise: Standard deviation of acceleration noise |
| 31 | + yaw_rate_noise: Standard deviation of yaw rate noise[deg] |
| 32 | + obsrv_x_noise: Standard deviation of x observation noise |
| 33 | + obsrv_y_noise: Standard deviation of y observation noise |
| 34 | + alpha: UKF's alpha parameter (controls spread of sigma points) |
| 35 | + beta: UKF's beta parameter (incorporates prior knowledge) |
| 36 | + kappa: UKF's kappa parameter (secondary scaling parameter) |
| 37 | + color: Color of drawing error covariance ellipse |
| 38 | + """ |
| 39 | + |
| 40 | + # state dimension (x, y, yaw, speed) |
| 41 | + self.DIM_NUM = 4 |
| 42 | + |
| 43 | + # UKF parameters |
| 44 | + self.ALPHA = alpha |
| 45 | + self.BETA = beta |
| 46 | + self.KAPPA = kappa |
| 47 | + self._decide_sigma_weights() |
| 48 | + |
| 49 | + # noise covariance matrices |
| 50 | + self.INPUT_NOISE_VAR_MAT = np.diag([accel_noise, np.deg2rad(yaw_rate_noise)]) ** 2 |
| 51 | + self.OBSRV_NOISE_VAR_MAT = np.diag([obsrv_x_noise, obsrv_y_noise]) ** 2 |
| 52 | + |
| 53 | + # initialize state and covariance |
| 54 | + self.state = np.zeros((self.DIM_NUM, 1)) |
| 55 | + self.cov_mat = np.eye(self.DIM_NUM) |
| 56 | + |
| 57 | + self.DRAW_COLOR = color |
| 58 | + |
| 59 | + def _decide_sigma_weights(self): |
| 60 | + """ |
| 61 | + Private function to decide weights for each sigma points |
| 62 | + """ |
| 63 | + |
| 64 | + self.LAMBDA = self.ALPHA**2 * (self.DIM_NUM + self.KAPPA) - self.DIM_NUM |
| 65 | + |
| 66 | + # for 2n + 1 sigma points |
| 67 | + state_weights, cov_weights = [], [] |
| 68 | + DIM_LAMBDA = self.DIM_NUM + self.LAMBDA |
| 69 | + state_weights.append(self.LAMBDA / DIM_LAMBDA) # i = 0 |
| 70 | + cov_weights.append((self.LAMBDA / DIM_LAMBDA) + (1 - self.ALPHA**2 + self.BETA)) # i = 0 |
| 71 | + for i in range(2 * self.DIM_NUM): |
| 72 | + state_weights.append(1 / (2 * DIM_LAMBDA)) |
| 73 | + cov_weights.append(1 / (2 * DIM_LAMBDA)) |
| 74 | + self.STATE_WEIGHTS = np.array([state_weights]) |
| 75 | + self.COV_WEIGHTS = np.array([cov_weights]) |
| 76 | + |
| 77 | + # for generating sigma points |
| 78 | + self.GAMMA = sqrt(DIM_LAMBDA) |
| 79 | + |
| 80 | + def _generate_sigma_points(self, state, cov): |
| 81 | + """ |
| 82 | + Private function to generate sigma points |
| 83 | + state: state vector (x, y, yaw, speed) |
| 84 | + cov: covariance matrix 4x4 |
| 85 | + Return: array of sigma points (4 x 9) |
| 86 | + """ |
| 87 | + |
| 88 | + sigmas = state |
| 89 | + cov_sqr = spl.sqrtm(cov) # matrix square root |
| 90 | + |
| 91 | + # add each dimension's std to state vector |
| 92 | + # positive direction |
| 93 | + for i in range(self.DIM_NUM): |
| 94 | + sigmas = np.hstack((sigmas, state + self.GAMMA * cov_sqr[:, i:i+1])) |
| 95 | + # negative direction |
| 96 | + for i in range(self.DIM_NUM): |
| 97 | + sigmas = np.hstack((sigmas, state - self.GAMMA * cov_sqr[:, i:i+1])) |
| 98 | + return sigmas |
| 99 | + |
| 100 | + def _predict_sigmas_motion(self, sigmas, input, time_s): |
| 101 | + """ |
| 102 | + Private function to predict motion of sigma points |
| 103 | + sigmas: array of sigma points |
| 104 | + input: input vector [accel, yaw_rate] |
| 105 | + time_s: simulation interval time[sec] |
| 106 | + Return: array of motion predicted sigma points |
| 107 | + """ |
| 108 | + |
| 109 | + pred_sigmas = np.zeros_like(sigmas) |
| 110 | + for i in range(sigmas.shape[1]): |
| 111 | + pred_sigmas[:, i:i+1] = State.motion_model(sigmas[:, i:i+1], input, time_s) |
| 112 | + return pred_sigmas |
| 113 | + |
| 114 | + def _predict_state_covariance(self, pred_state, pred_sigmas): |
| 115 | + """ |
| 116 | + Private function to predict state covariance |
| 117 | + pred_state: predicted state vector |
| 118 | + pred_sigmas: array of motion predicted sigma points |
| 119 | + Return: predicted state covariance matrix |
| 120 | + """ |
| 121 | + |
| 122 | + diff = pred_sigmas - pred_state |
| 123 | + pred_cov = np.zeros((4, 4)) |
| 124 | + for i in range(pred_sigmas.shape[1]): |
| 125 | + pred_cov = pred_cov + self.COV_WEIGHTS[0, i] * diff[:, i:i+1] @ diff[:, i:i+1].T |
| 126 | + return pred_cov |
| 127 | + |
| 128 | + def _observation_model(self, state): |
| 129 | + """ |
| 130 | + Private function of observation model |
| 131 | + state: state vector (x, y, yaw, speed) |
| 132 | + Return: predicted observation data (x, y) |
| 133 | + """ |
| 134 | + |
| 135 | + return np.array([[state[0, 0]], [state[1, 0]]]) |
| 136 | + |
| 137 | + def _predict_sigmas_observation(self, sigmas): |
| 138 | + """ |
| 139 | + Private function to predict observation at each sigma point |
| 140 | + sigmas: array of sigma points |
| 141 | + Return: predicted observation vector at each sigma point (2 x 9) |
| 142 | + """ |
| 143 | + |
| 144 | + sigmas_num = sigmas.shape[1] |
| 145 | + obv_sigmas = np.zeros((2, sigmas_num)) |
| 146 | + for i in range(sigmas_num): |
| 147 | + obv_sigmas[:, i:i+1] = self._observation_model(sigmas[:, i:i+1]) |
| 148 | + return obv_sigmas |
| 149 | + |
| 150 | + def _predict_observation_covariance(self, pred_obv, pred_obv_sigmas, obsrv_noise_cov): |
| 151 | + """ |
| 152 | + Private function to predict observation covariance |
| 153 | + pred_obv: predicted observation vector |
| 154 | + pred_obv_sigmas: predicted observation at each sigma point |
| 155 | + obsrv_noise_cov: observation noise covariance matrix |
| 156 | + Return: predicted observation covariance matrix |
| 157 | + """ |
| 158 | + |
| 159 | + diff = pred_obv_sigmas - pred_obv |
| 160 | + pred_obv_cov = obsrv_noise_cov |
| 161 | + for i in range(pred_obv_sigmas.shape[1]): |
| 162 | + pred_obv_cov = pred_obv_cov + self.COV_WEIGHTS[0, i] * diff[:, i:i+1] @ diff[:, i:i+1].T |
| 163 | + return pred_obv_cov |
| 164 | + |
| 165 | + def _calculate_correlation_matrix(self, pred_state, pred_sigmas, pred_obv, pred_obv_sigmas): |
| 166 | + """ |
| 167 | + Private function to calculate correlation matrix between state and observation |
| 168 | + pred_state: predicted state vector |
| 169 | + pred_sigmas: motion predicted sigma points |
| 170 | + pred_obv: predicted observation vector |
| 171 | + pred_obv_sigmas: predicted observation at each sigma point |
| 172 | + Return: correlation matrix between state and observation |
| 173 | + """ |
| 174 | + |
| 175 | + sigmas_num = pred_sigmas.shape[1] |
| 176 | + diff_state = pred_sigmas - pred_state |
| 177 | + diff_obv = pred_obv_sigmas - pred_obv |
| 178 | + corr_mat = np.zeros((diff_state.shape[0], diff_obv.shape[0])) |
| 179 | + for i in range(sigmas_num): |
| 180 | + corr_mat = corr_mat + self.COV_WEIGHTS[0, i] * diff_state[:, i:i+1] @ diff_obv[:, i:i+1].T |
| 181 | + return corr_mat |
| 182 | + |
| 183 | + def update(self, state, accel_mps2, yaw_rate_rps, time_s, gnss): |
| 184 | + """ |
| 185 | + Function to update data |
| 186 | + state: Last estimated state data |
| 187 | + accel_mps2: Acceleration input from controller |
| 188 | + yaw_rate_rps: Yaw rate input from controller |
| 189 | + time_s: Simulation interval time[sec] |
| 190 | + gnss: GNSS observation data (x, y) |
| 191 | + Return: Estimated state data |
| 192 | + """ |
| 193 | + |
| 194 | + last_state = np.array([[state.get_x_m()], |
| 195 | + [state.get_y_m()], |
| 196 | + [state.get_yaw_rad()], |
| 197 | + [state.get_speed_mps()]]) |
| 198 | + |
| 199 | + # input with noise |
| 200 | + input_org = np.array([[accel_mps2], |
| 201 | + [yaw_rate_rps]]) |
| 202 | + input_noise = np.sqrt(self.INPUT_NOISE_VAR_MAT) @ np.random.randn(2, 1) |
| 203 | + next_input = input_org + input_noise |
| 204 | + |
| 205 | + # augment state with input noise for sigma point generation |
| 206 | + # create augmented state [x, y, yaw, speed, accel_noise, yaw_rate_noise] |
| 207 | + aug_state = np.vstack((last_state, np.zeros((2, 1)))) |
| 208 | + aug_cov = np.block([[self.cov_mat, np.zeros((4, 2))], |
| 209 | + [np.zeros((2, 4)), self.INPUT_NOISE_VAR_MAT]]) |
| 210 | + |
| 211 | + # generate sigma points from augmented state |
| 212 | + aug_sigmas = self._generate_sigma_points(aug_state, aug_cov) |
| 213 | + |
| 214 | + # extract state and input noise from augmented sigma points |
| 215 | + state_sigmas = aug_sigmas[:4, :] |
| 216 | + input_noise_sigmas = aug_sigmas[4:, :] |
| 217 | + |
| 218 | + # predict motion of sigma points with input noise |
| 219 | + pred_sigmas = np.zeros((4, aug_sigmas.shape[1])) |
| 220 | + for i in range(aug_sigmas.shape[1]): |
| 221 | + input_with_noise = next_input + input_noise_sigmas[:, i:i+1] |
| 222 | + pred_sigmas[:, i:i+1] = State.motion_model(state_sigmas[:, i:i+1], input_with_noise, time_s) |
| 223 | + |
| 224 | + # compute predicted state mean |
| 225 | + pred_state = (self.STATE_WEIGHTS @ pred_sigmas.T).T |
| 226 | + |
| 227 | + # compute predicted state covariance |
| 228 | + pred_cov = self._predict_state_covariance(pred_state, pred_sigmas) |
| 229 | + |
| 230 | + # predict observation - generate new sigma points from predicted state and covariance |
| 231 | + obv_state_sigmas = self._generate_sigma_points(pred_state, pred_cov) |
| 232 | + pred_obv_sigmas = self._predict_sigmas_observation(obv_state_sigmas) |
| 233 | + pred_obv = (self.STATE_WEIGHTS @ pred_obv_sigmas.T).T |
| 234 | + |
| 235 | + # compute predicted observation covariance |
| 236 | + pred_obv_cov = self._predict_observation_covariance(pred_obv, pred_obv_sigmas, self.OBSRV_NOISE_VAR_MAT) |
| 237 | + |
| 238 | + # compute correlation matrix between state and observation |
| 239 | + corr_mat = self._calculate_correlation_matrix(pred_state, obv_state_sigmas, pred_obv, pred_obv_sigmas) |
| 240 | + |
| 241 | + # kalman gain |
| 242 | + k = corr_mat @ np.linalg.inv(pred_obv_cov) |
| 243 | + |
| 244 | + # update |
| 245 | + inov = gnss - pred_obv |
| 246 | + est_state = pred_state + k @ inov |
| 247 | + est_cov = pred_cov - k @ pred_obv_cov @ k.T |
| 248 | + |
| 249 | + self.state = est_state |
| 250 | + self.cov_mat = est_cov |
| 251 | + |
| 252 | + return est_state |
| 253 | + |
| 254 | + def draw(self, axes, elems, pose): |
| 255 | + """ |
| 256 | + Function to draw error covariance ellipse |
| 257 | + axes: Axes objects of figure |
| 258 | + elems: List of plot object |
| 259 | + pose: Vehicle's pose[x, y, yaw] |
| 260 | + """ |
| 261 | + |
| 262 | + # extract 2x2 covariance for x and y |
| 263 | + xy_cov = self.cov_mat[:2, :2] |
| 264 | + eig_val, eig_vec = np.linalg.eig(xy_cov) |
| 265 | + if eig_val[0] >= eig_val[1]: |
| 266 | + big_idx, small_idx = 0, 1 |
| 267 | + else: |
| 268 | + big_idx, small_idx = 1, 0 |
| 269 | + a, b = sqrt(3.0 * eig_val[big_idx]), sqrt(3.0 * eig_val[small_idx]) |
| 270 | + angle = atan2(eig_vec[1, big_idx], eig_vec[0, big_idx]) |
| 271 | + |
| 272 | + t = np.arange(0, 2 * pi + 0.1, 0.1) |
| 273 | + xs = [a * cos(it) for it in t] |
| 274 | + ys = [b * sin(it) for it in t] |
| 275 | + xys = np.array([xs, ys]) |
| 276 | + xys_array = XYArray(xys) |
| 277 | + |
| 278 | + transformed_xys = xys_array.homogeneous_transformation(pose[0, 0], pose[1, 0], angle) |
| 279 | + elip_plot, = axes.plot(transformed_xys.get_x_data(), transformed_xys.get_y_data(), color=self.DRAW_COLOR) |
| 280 | + elems.append(elip_plot) |
| 281 | + |
0 commit comments