diff --git a/docs/tutorials/01_KalmanFilterTutorial.py b/docs/tutorials/01_KalmanFilterTutorial.py index 3635186f4..102d4a89d 100644 --- a/docs/tutorials/01_KalmanFilterTutorial.py +++ b/docs/tutorials/01_KalmanFilterTutorial.py @@ -118,7 +118,7 @@ # We note that it can sometimes be useful to fix our random number generator in order to probe a # particular example repeatedly. -np.random.seed(1991) +np.random.seed(19915) # %% # The :class:`~.ConstantVelocity` class creates a one-dimensional constant velocity model with diff --git a/stonesoup/functions/__init__.py b/stonesoup/functions/__init__.py index f0549c7ff..1ffbaee82 100644 --- a/stonesoup/functions/__init__.py +++ b/stonesoup/functions/__init__.py @@ -60,8 +60,8 @@ def cholesky_eps(A, lower=False): L = np.zeros(A.shape) for i in range(A.shape[0]): for j in range(i): - L[i, j] = (A[i, j] - L[i, :]@L[j, :].T) / L[j, j] - val = A[i, i] - L[i, :]@L[i, :].T + L[i, j] = (A[i, j] - L[i, :] @ L[j, :].T) / L[j, j] + val = A[i, i] - L[i, :] @ L[i, :].T L[i, i] = np.sqrt(val) if val > eps else np.sqrt(eps) if lower: @@ -70,7 +70,7 @@ def cholesky_eps(A, lower=False): return L.T -def jacobian(fun, x, **kwargs): +def jacobian(fun, x, **kwargs): """Compute Jacobian through finite difference calculation Parameters @@ -92,13 +92,14 @@ def jacobian(fun, x, **kwargs): # For numerical reasons the step size needs to large enough. Aim for 1e-8 # relative to spacing between floating point numbers for each dimension - delta = 1e8*np.spacing(x.state_vector.astype(np.float64).ravel()) + delta = 1e8 * np.spacing(x.state_vector.astype(np.float64).ravel()) # But at least 1e-8 # TODO: Is this needed? If not, note special case at zero. delta[delta < 1e-8] = 1e-8 x2 = copy.copy(x) # Create a clone of the input - x2.state_vector = np.tile(x.state_vector, ndim+1) + np.eye(ndim, ndim+1)*delta[:, np.newaxis] + x2.state_vector = (np.tile(x.state_vector, ndim + 1) + np.eye(ndim, ndim + 1) + * delta[:, np.newaxis]) x2.state_vector = x2.state_vector.view(StateVectors) F = fun(x2, **kwargs) @@ -170,9 +171,9 @@ def gauss2sigma(state, alpha=1.0, beta=2.0, kappa=None): # Can't use in place addition/subtraction as casting issues may arise when mixing float/int sigma_points[:, 1:(ndim_state + 1)] = \ - sigma_points[:, 1:(ndim_state + 1)] + sqrt_sigma*np.sqrt(c) + sigma_points[:, 1:(ndim_state + 1)] + sqrt_sigma * np.sqrt(c) sigma_points[:, (ndim_state + 1):] = \ - sigma_points[:, (ndim_state + 1):] - sqrt_sigma*np.sqrt(c) + sigma_points[:, (ndim_state + 1):] - sqrt_sigma * np.sqrt(c) # Put these sigma points into s State object list sigma_points_states = [] @@ -218,7 +219,7 @@ def sigma2gauss(sigma_points, mean_weights, covar_weights, covar_noise=None): points_diff = sigma_points - mean - covar = points_diff@(np.diag(covar_weights))@(points_diff.T) + covar = points_diff @ (np.diag(covar_weights)) @ (points_diff.T) if covar_noise is not None: covar = covar + covar_noise return mean.view(StateVector), covar.view(CovarianceMatrix) @@ -285,7 +286,8 @@ def unscented_transform(sigma_points_states, mean_weights, covar_weights, # Calculate cross-covariance cross_covar = ( - (sigma_points-sigma_points[:, 0:1]) @ np.diag(covar_weights) @ (sigma_points_t-mean).T + (sigma_points - sigma_points[:, 0:1]) @ np.diag(covar_weights) + @ (sigma_points_t - mean).T ).view(CovarianceMatrix) return mean, covar, cross_covar, sigma_points_t, mean_weights, covar_weights @@ -308,7 +310,7 @@ def cart2pol(x, y): """ - rho = np.sqrt(x**2 + y**2) + rho = np.sqrt(x ** 2 + y ** 2) phi = np.arctan2(y, x) return (rho, phi) @@ -333,9 +335,9 @@ def cart2sphere(x, y, z): """ - rho = np.sqrt(x**2 + y**2 + z**2) + rho = np.sqrt(x ** 2 + y ** 2 + z ** 2) phi = np.arctan2(y, x) - theta = np.arcsin(z/rho) + theta = np.arcsin(z / rho) return (rho, phi, theta) @@ -424,7 +426,7 @@ def cart2az_el_rg(x, y, z): (float, float, float) A tuple of the form `(phi, theta, rho)` """ - rho = np.sqrt(x**2 + y**2 + z**2) + rho = np.sqrt(x ** 2 + y ** 2 + z ** 2) phi = np.arcsin(x / rho) theta = np.arcsin(y / rho) return phi, theta, rho @@ -590,9 +592,9 @@ def gm_sample(means, covars, size, weights=None): means = means.view(StateVectors) if isinstance(means, StateVectors) and weights is None: - weights = np.array([1/means.shape[1]]*means.shape[1]) + weights = np.array([1 / means.shape[1]] * means.shape[1]) elif weights is None: - weights = np.array([1/len(means)]*len(means)) + weights = np.array([1 / len(means)] * len(means)) n_samples = np.random.multinomial(size, weights) samples = np.vstack([np.random.multivariate_normal(mean.ravel(), covar, sample) @@ -621,7 +623,7 @@ def gm_reduce_single(means, covars, weights): The covariance of the reduced/single Gaussian """ # Normalise weights such that they sum to 1 - weights = weights/Probability.sum(weights) + weights = weights / Probability.sum(weights) # Cast means as a StateVectors, so this works with ndarray types means = means.view(StateVectors) @@ -631,7 +633,8 @@ def gm_reduce_single(means, covars, weights): # Calculate covar delta_means = means - mean - covar = np.sum(covars*weights, axis=2, dtype=np.float64) + weights*delta_means@delta_means.T + covar = (np.sum(covars * weights, axis=2, dtype=np.float64) + weights + * delta_means @ delta_means.T) return mean.view(StateVector), covar.view(CovarianceMatrix) @@ -651,7 +654,7 @@ def mod_bearing(x): Angle in radians in the range math: :math:`-\pi` to :math:`+\pi` """ - x = (x+np.pi) % (2.0*np.pi)-np.pi + x = (x + np.pi) % (2.0 * np.pi) - np.pi return x @@ -670,8 +673,8 @@ def mod_elevation(x): float Angle in radians in the range math: :math:`-\pi/2` to :math:`+\pi/2` """ - x = x % (2*np.pi) # limit to 2*pi - N = x//(np.pi/2) # Count # of 90 deg multiples + x = x % (2 * np.pi) # limit to 2*pi + N = x // (np.pi / 2) # Count # of 90 deg multiples if N == 1: x = np.pi - x elif N == 2: @@ -762,9 +765,9 @@ def dotproduct(a, b): # Decide whether this is a StateVector or a StateVectors if type(a) is StateVector and type(b) is StateVector: - return np.sum(a*b) + return np.sum(a * b) elif type(a) is StateVectors and type(b) is StateVectors: - return np.atleast_2d(np.asarray(np.sum(a*b, axis=0))) + return np.atleast_2d(np.asarray(np.sum(a * b, axis=0))) else: raise ValueError("Inputs must be `StateVector` or `StateVectors` and of the same type") @@ -794,5 +797,65 @@ def sde_euler_maruyama_integration(fun, t_values, state_x0): delta_t = next_t - t delta_w = np.random.normal(scale=np.sqrt(delta_t), size=(state_x.ndim, 1)) a, b = fun(state_x, t) - state_x.state_vector = state_x.state_vector + a*delta_t + b@delta_w + state_x.state_vector = state_x.state_vector + a * delta_t + b @ delta_w return state_x.state_vector + + +def slr_definition(state, pred_func, func=None, force_symmetric_covariance=True): + """ Statistical linear regression (SLR) is a method to linearise a nonlinear function, + such as that representing dynamics or measurement. The SLR parameters are the linear + coefficients that approximate the function in the vicinity of the state pdf. The SLR + parameters are obtained by computing the cross-covariance and the covariance of the + predicted quantities, based on a set of sigma points as implemented by the prediction + function 'fun'. The method presented here adapts the definition (9)-(11) as found in [1]. + + References + ---------- + [1] Á. F. García-Fernández, L. Svensson and S. Särkkä, "Iterated Posterior Linearization + Smoother," in IEEE Transactions on Automatic Control, vol. 62, no. 4, pp. 2056-2063, + April 2017, doi: 10.1109/TAC.2016.2592681. + + Parameters + ---------- + state: :class:`~.State` + State object containing the state pdf. + pred_func : callable + Function that predicts the required statistical quantities. + func : callable, optional + User-specified function that manipulates the bias vector 'b_plus'. + force_symmetry : bool + Parameter that enforces symmetry in the covariance matrix 'Omega_plus'. + + Returns + ------- + :class:`~.Matrix` + The SLR parameter 'H_plus' representing the linear transformation matrix. + :class:`~.StateVector` + The SLR parameter 'b_plus' representing the bias vector. + :class:`~.CovarianceMatrix` + The SLR parameter 'Omega_plus' representing the noise covariance matrix. + """ + + # Get the first two moments of the state pdf + x_bar = state.state_vector # mean + p_matrix = state.covar # covariance + + # Get the predicted quantities based on a prediction function 'fun' + # implemented through a set of sigma points + prediction = pred_func(state) + z_bar = prediction.state_vector.astype(float) # mean + psi = prediction.cross_covar # cross-covariance + phi = prediction.covar # covariance + + # Compute the SLR parameters + H_plus = psi.T @ np.linalg.pinv(p_matrix) + b_plus = z_bar - H_plus @ x_bar + Omega_plus = phi - H_plus @ p_matrix @ H_plus.T + + if func is not None: + b_plus = func(b_plus) + + if force_symmetric_covariance: + Omega_plus = (Omega_plus + Omega_plus.T) / 2 + + return H_plus, b_plus.view(StateVector), Omega_plus.view(CovarianceMatrix) diff --git a/stonesoup/functions/tests/test_functions.py b/stonesoup/functions/tests/test_functions.py index 2b168121c..9d9b4c05f 100644 --- a/stonesoup/functions/tests/test_functions.py +++ b/stonesoup/functions/tests/test_functions.py @@ -3,19 +3,23 @@ from numpy import deg2rad from scipy.linalg import cholesky, LinAlgError from pytest import approx, raises +import datetime from .. import ( cholesky_eps, jacobian, gm_reduce_single, mod_bearing, mod_elevation, gauss2sigma, - rotx, roty, rotz, cart2sphere, cart2angles, pol2cart, sphere2cart, dotproduct, gm_sample) -from ...types.array import StateVector, StateVectors, Matrix + rotx, roty, rotz, cart2sphere, cart2angles, pol2cart, sphere2cart, dotproduct, gm_sample, + slr_definition) +from ...types.array import StateVector, StateVectors, Matrix, CovarianceMatrix from ...types.state import State, GaussianState +from ...types.prediction import GaussianMeasurementPrediction +from ...types.update import GaussianStateUpdate def test_cholesky_eps(): matrix = np.array([[0.4, -0.2, 0.1], [0.3, 0.1, -0.2], [-0.3, 0.0, 0.4]]) - matrix = matrix@matrix.T + matrix = matrix @ matrix.T cholesky_matrix = cholesky(matrix) @@ -25,10 +29,10 @@ def test_cholesky_eps(): def test_cholesky_eps_bad(): matrix = np.array( - [[ 0.05201447, 0.02882126, -0.00569971, -0.00733617], # noqa: E201 - [ 0.02882126, 0.01642966, -0.00862847, -0.00673035], # noqa: E201 - [-0.00569971, -0.00862847, 0.06570757, 0.03251551], - [-0.00733617, -0.00673035, 0.03251551, 0.01648615]]) + [[0.05201447, 0.02882126, -0.00569971, -0.00733617], # noqa: E201 + [0.02882126, 0.01642966, -0.00862847, -0.00673035], # noqa: E201 + [-0.00569971, -0.00862847, 0.06570757, 0.03251551], + [-0.00733617, -0.00673035, 0.03251551, 0.01648615]]) with raises(LinAlgError): cholesky(matrix) cholesky_eps(matrix) @@ -41,7 +45,7 @@ def test_jacobian(): state_mean = StateVector([[3.0], [1.0]]) def f(x): - return np.array([[1, 1], [0, 1]])@x.state_vector + return np.array([[1, 1], [0, 1]]) @ x.state_vector jac = jacobian(f, State(state_mean)) assert np.allclose(jac, np.array([[1, 1], [0, 1]])) @@ -53,23 +57,23 @@ def test_jacobian2(): # Sample functions to compute Jacobian on def fun(x): """ function for testing scalars i.e. scalar input, scalar output""" - return 2*x.state_vector**2 + return 2 * x.state_vector ** 2 def fun1d(ins): """ test function with vector input, scalar output""" - out = 2*ins.state_vector[0, :]+3*ins.state_vector[1, :] + out = 2 * ins.state_vector[0, :] + 3 * ins.state_vector[1, :] return np.atleast_2d(out) def fun2d(vec): """ test function with 2d input and 2d output""" out = np.empty(vec.state_vector.shape) - out[0, :] = 2*vec.state_vector[0, :]**2 + 3*vec.state_vector[1, :]**2 - out[1, :] = 2*vec.state_vector[0, :]+3*vec.state_vector[1, :] + out[0, :] = 2 * vec.state_vector[0, :] ** 2 + 3 * vec.state_vector[1, :] ** 2 + out[1, :] = 2 * vec.state_vector[0, :] + 3 * vec.state_vector[1, :] return out x = 3 jac = jacobian(fun, State(StateVector([[x]]))) - assert np.allclose(jac, 4*x) + assert np.allclose(jac, 4 * x) x = StateVector([[1], [2]]) # Tolerance value to use to test if arrays are equal @@ -78,12 +82,12 @@ def fun2d(vec): jac = jacobian(fun1d, State(x)) T = np.array([2.0, 3.0]) - FOM = np.where(np.abs(jac-T) > tol) + FOM = np.where(np.abs(jac - T) > tol) # Check # of array elements bigger than tol assert len(FOM[0]) == 0 jac = jacobian(fun2d, State(x)) - T = np.array([[4.0*x[0], 6*x[1]], + T = np.array([[4.0 * x[0], 6 * x[1]], [2, 3]]) FOM = np.where(np.abs(jac - T) > tol) # Check # of array elements bigger than tol @@ -96,7 +100,7 @@ def test_jacobian_param(): # Sample functions to compute Jacobian on def fun(x, value=0.0): """ function for jabcobian parameter passing""" - return value*x.state_vector + return value * x.state_vector x = 4 value = 2.0 @@ -109,14 +113,13 @@ def test_jacobian_large_values(): state = State(StateVector([[1E10], [1.0]])) def f(x): - return x.state_vector**2 + return x.state_vector ** 2 jac = jacobian(f, state) assert np.allclose(jac, np.array([[2e10, 0.0], [0.0, 2.0]])) def test_gm_reduce_single(): - means = StateVectors([StateVector([1, 2]), StateVector([3, 4]), StateVector([5, 6])]) covars = np.stack([[[1, 1], [1, 0.7]], [[1.2, 1.4], [1.3, 2]], @@ -162,8 +165,8 @@ def test_elevation(): @pytest.mark.parametrize( "mean", [ - 1, # int - 1.0 # float + 1, # int + 1.0 # float ] ) def test_gauss2sigma(mean): @@ -173,15 +176,15 @@ def test_gauss2sigma(mean): sigma_points_states, mean_weights, covar_weights = gauss2sigma(state, kappa=0) for n, sigma_point_state in zip((0, 1, -1), sigma_points_states): - assert sigma_point_state.state_vector[0, 0] == approx(mean + n*covar**0.5) + assert sigma_point_state.state_vector[0, 0] == approx(mean + n * covar ** 0.5) def test_gauss2sigma_bad_covar(): covar = np.array( - [[ 0.05201447, 0.02882126, -0.00569971, -0.00733617], # noqa: E201 - [ 0.02882126, 0.01642966, -0.00862847, -0.00673035], # noqa: E201 - [-0.00569971, -0.00862847, 0.06570757, 0.03251551], - [-0.00733617, -0.00673035, 0.03251551, 0.01648615]]) + [[0.05201447, 0.02882126, -0.00569971, -0.00733617], # noqa: E201 + [0.02882126, 0.01642966, -0.00862847, -0.00673035], # noqa: E201 + [-0.00569971, -0.00862847, 0.06570757, 0.03251551], + [-0.00733617, -0.00673035, 0.03251551, 0.01648615]]) state = GaussianState([[0], [0], [0], [0]], covar) with pytest.warns(UserWarning, match="Matrix is not positive definite"): @@ -192,19 +195,18 @@ def test_gauss2sigma_bad_covar(): "angle", [ ( - np.array([np.pi]), # angle - np.array([np.pi / 2]), - np.array([-np.pi]), - np.array([-np.pi / 2]), - np.array([np.pi / 4]), - np.array([-np.pi / 4]), - np.array([np.pi / 8]), - np.array([-np.pi / 8]), + np.array([np.pi]), # angle + np.array([np.pi / 2]), + np.array([-np.pi]), + np.array([-np.pi / 2]), + np.array([np.pi / 4]), + np.array([-np.pi / 4]), + np.array([np.pi / 8]), + np.array([-np.pi / 8]), ) ] ) def test_rotations(angle): - c, s = np.cos(angle), np.sin(angle) zero = np.zeros_like(angle) one = np.ones_like(angle) @@ -233,7 +235,6 @@ def test_rotations(angle): ] ) def test_cart_sphere_inversions(x, y, z): - rho, phi, theta = cart2sphere(x, y, z) # Check sphere2cart(cart2sphere(cart)) == cart @@ -258,10 +259,9 @@ def test_cart_sphere_inversions(x, y, z): (Matrix([[1, 0], [0, 1]]), Matrix([[3, 1], [1, -3]])), (StateVectors([[1, 0], [0, 1]]), StateVectors([[3, 1], [1, -3]])), (StateVectors([[1, 0], [0, 1]]), StateVector([3, 1])) - ] + ] ) def test_dotproduct(state_vector1, state_vector2): - # Test that they raise the right error if not 1d, i.e. vectors if type(state_vector1) is not type(state_vector2): with pytest.raises(ValueError): @@ -289,46 +289,53 @@ def test_dotproduct(state_vector1, state_vector2): "means, covars, weights, size", [ ( - [np.array([10, 10]), np.array([20, 20]), np.array([30, 30])], # means - [np.eye(2), np.eye(2), np.eye(2)], # covars - np.array([1/3]*3), # weights - 20 # size - ), ( - StateVectors(np.array([[20, 30, 40, 50], [20, 30, 40, 50]])), # means - [np.eye(2), np.eye(2), np.eye(2), np.eye(2)], # covars - np.array([1/4]*4), # weights - 20 # size - ), ( - [np.array([10, 10]), np.array([20, 20]), np.array([30, 30])], # means - np.array([np.eye(2), np.eye(2), np.eye(2)]), # covars - np.array([1/3]*3), # weights - 20 # size - ), ( - [StateVector(np.array([10, 10])), StateVector(np.array([20, 20])), - StateVector(np.array([30, 30]))], # means - [np.eye(2), np.eye(2), np.eye(2)], # covars - np.array([1/3]*3), # weights - 20 # size - ), ( - StateVector(np.array([10, 10])), # means - [np.eye(2)], # covars - np.array([1]), # weights - 20 # size - ), ( - np.array([10, 10]), # means - [np.eye(2)], # covars - np.array([1]), # weights - 20 # size - ), ( - [np.array([10, 10]), np.array([20, 20]), np.array([30, 30])], # means - [np.eye(2), np.eye(2), np.eye(2)], # covars - None, # weights - 20 # size - ), ( - StateVectors(np.array([[20, 30, 40, 50], [20, 30, 40, 50]])), # means - [np.eye(2), np.eye(2), np.eye(2), np.eye(2)], # covars - None, # weights - 20 # size + [np.array([10, 10]), np.array([20, 20]), np.array([30, 30])], # means + [np.eye(2), np.eye(2), np.eye(2)], # covars + np.array([1 / 3] * 3), # weights + 20 # size + ), + ( + StateVectors(np.array([[20, 30, 40, 50], [20, 30, 40, 50]])), # means + [np.eye(2), np.eye(2), np.eye(2), np.eye(2)], # covars + np.array([1 / 4] * 4), # weights + 20 # size + ), + ( + [np.array([10, 10]), np.array([20, 20]), np.array([30, 30])], # means + np.array([np.eye(2), np.eye(2), np.eye(2)]), # covars + np.array([1 / 3] * 3), # weights + 20 # size + ), + ( + [StateVector(np.array([10, 10])), StateVector(np.array([20, 20])), + StateVector(np.array([30, 30]))], # means + [np.eye(2), np.eye(2), np.eye(2)], # covars + np.array([1 / 3] * 3), # weights + 20 # size + ), + ( + StateVector(np.array([10, 10])), # means + [np.eye(2)], # covars + np.array([1]), # weights + 20 # size + ), + ( + np.array([10, 10]), # means + [np.eye(2)], # covars + np.array([1]), # weights + 20 # size + ), + ( + [np.array([10, 10]), np.array([20, 20]), np.array([30, 30])], # means + [np.eye(2), np.eye(2), np.eye(2)], # covars + None, # weights + 20 # size + ), + ( + StateVectors(np.array([[20, 30, 40, 50], [20, 30, 40, 50]])), # means + [np.eye(2), np.eye(2), np.eye(2), np.eye(2)], # covars + None, # weights + 20 # size ) ], ids=["mean_list", "mean_statevectors", "3d_covar_array", "mean_statevector_list", "single_statevector_mean", "single_ndarray_mean", "no_weight_mean_list", @@ -344,3 +351,35 @@ def test_gm_sample(means, covars, weights, size): assert samples.shape[0] == means[0].shape[0] else: assert samples.shape[0] == means.shape[0] + + +def test_slr_definition(): + def identity(state): + return GaussianMeasurementPrediction( + state_vector=state.state_vector, + covar=state.covar, + timestamp=state.timestamp, + cross_covar=CovarianceMatrix([[1, 0], + [0, 1]])) + + time1 = datetime.datetime.now() + + posterior_state = GaussianStateUpdate( + state_vector=np.array([[7.77342961], [1.]]), + covar=CovarianceMatrix([[4.54274216e+00, -8.47168858e-16], + [-8.47168858e-16, 2.22044604e-16]]), + hypothesis=None, + timestamp=time1) + + h_matrix, b_vector, omega_cov_matrix = slr_definition(posterior_state, + identity, + force_symmetric_covariance=True) + assert np.allclose(h_matrix, + np.array([[2.20131358e-01, -4.10519515e-17], + [-4.10519515e-17, 7.65571403e-33]])) + assert np.allclose(b_vector, + np.array([[6.06225399], + [1.]])) + assert np.allclose(omega_cov_matrix, + np.array([[4.32261080e+00, -8.06116906e-16], + [-8.06116906e-16, 2.22044604e-16]])) diff --git a/stonesoup/models/measurement/linear.py b/stonesoup/models/measurement/linear.py index 3968c2b6b..05e144a98 100644 --- a/stonesoup/models/measurement/linear.py +++ b/stonesoup/models/measurement/linear.py @@ -1,9 +1,11 @@ import numpy as np +from typing import Sequence from ...base import Property -from ...types.array import CovarianceMatrix +from ...types.array import CovarianceMatrix, Matrix from ..base import LinearModel, GaussianModel from .base import MeasurementModel +from ...types.state import StateVector # TODO: Probably should call this LinearGaussianMeasurementModel @@ -82,7 +84,7 @@ def function(self, state, noise=False, **kwargs): else: noise = 0 - return self.matrix(**kwargs)@state.state_vector + noise + return self.matrix(**kwargs) @ state.state_vector + noise def covar(self, **kwargs): """Returns the measurement model noise covariance matrix. @@ -95,3 +97,114 @@ def covar(self, **kwargs): """ return self.noise_covar + + +class GeneralLinearGaussian(LinearGaussian): + r""" + An extended implementation of the linear-Gaussian measurement model described by + .. math:: + + y_k = H*x_k + b + v_k,\ \ \ \ v_k\sim \mathcal{N}(0,R) + + where :math:`H` is a measurement matrix, :math:`b` is a bias vector and :math:`v_k` + is Gaussian distributed. This class permits specification of :math:`H` in two ways: + either constructing it internally, using :attr:`~.MeasurementModel.mapping` as in + :class:`~.LinearGaussian`, or by explicitly specifying the matrix through + :attr:`~.GeneralLinearGaussian.meas_matrix`. + When both attributes are provided, the preference is given to + :attr:`~.GeneralLinearGaussian.meas_matrix`. + Furthermore, unlike :class:`~.LinearGaussian` this implementation permits a certain + bias :attr:`~.GeneralLinearGaussian.bias_value` in a measurement model. + """ + + mapping: Sequence[int] = Property(default=None, + doc="Mapping between measurement and state dimensions") + meas_matrix: Matrix = Property(default=None, + doc="Arbitrary measurement matrix") + bias_value: StateVector = Property(default=None, doc="Bias value") + + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + if self.mapping is None and self.meas_matrix is None: + raise ValueError("self.mapping and self.meas_matrix cannot both be None.") + + if self.meas_matrix is not None: + if not isinstance(self.meas_matrix, Matrix): + self.meas_matrix = Matrix(self.meas_matrix) + + if self.meas_matrix is not None \ + and self.meas_matrix.shape[1] != self.ndim_state: + raise ValueError("meas_matrix should have the same number of " + "columns as the number of rows in state_vector") + + if self.bias_value is None: + self.bias_value = StateVector([0]) + + if not isinstance(self.bias_value, StateVector): + self.bias_value = StateVector(self.bias_value) + + @property + def ndim_meas(self): + """ndim_meas getter method + + Returns + ------- + :class:`int` + The number of measurement dimensions. + """ + if self.meas_matrix is None: + return super().ndim_meas # implemented via len(self.mapping) as in LinearGaussian + + return self.meas_matrix.shape[0] + + def matrix(self, **kwargs): + """Model matrix :math:`H` + + Returns + ------- + :class:`numpy.ndarray` of shape \ + (:py:attr:`~ndim_meas`, :py:attr:`~ndim_state`) + The model matrix. + """ + + if self.meas_matrix is None: + return super().matrix(**kwargs) # matrix constructed as in LinearGaussian + + return self.meas_matrix + + def bias(self, **kwargs): + """Model bias :math:`b` + + Returns + ------- + :class:`numpy.ndarray` of shape \ + (:py:attr:`~ndim_meas`, 1) + The bias value. + """ + return self.bias_value + + def function(self, state, noise=False, **kwargs): + """Model function :math:`H*x_t+b+v_k` + + Parameters + ---------- + state: :class:`~.State` + An input state + noise: :class:`numpy.ndarray` or bool + An externally generated random process noise sample (the default is + `False`, in which case no noise will be added + if 'True', the output of :meth:`~.Model.rvs` is added) + + Returns + ------- + :class:`numpy.ndarray` of shape (:py:attr:`~ndim_meas`, 1) + The model function evaluated. + """ + + if isinstance(noise, bool) or noise is None: + if noise: + noise = self.rvs(num_samples=state.state_vector.shape[1], **kwargs) + else: + noise = 0 + + return self.matrix(**kwargs) @ state.state_vector + self.bias_value + noise diff --git a/stonesoup/models/measurement/tests/test_lg.py b/stonesoup/models/measurement/tests/test_lg.py index b2fd36366..130134697 100644 --- a/stonesoup/models/measurement/tests/test_lg.py +++ b/stonesoup/models/measurement/tests/test_lg.py @@ -3,7 +3,7 @@ import numpy as np from scipy.stats import multivariate_normal -from ..linear import LinearGaussian +from ..linear import LinearGaussian, GeneralLinearGaussian from ....types.state import State @@ -106,3 +106,105 @@ def test_lgmodel(H, R, ndim_state, mapping): # Check first values produced by seed match for _ in range(3): assert all(lg1.rvs() == lg2.rvs()) + + +@pytest.mark.parametrize( + "H, R, B, ndim_state, mapping", + [ + ( # 1D meas, 2D state + np.array([[1, 0]]), + np.array([[0.1]]), + np.array([[1.0]]), + 2, + [0], + ), + ( # 2D meas, 4D state + np.array([[1, 0, 0, 0], [0, 0, 1, 0]]), + np.diag([0.1, 0.1]), + np.array([[1.0]]), + 4, + [0, 2], + ), + ( # 4D meas, 2D state + np.array([[1, 0], [0, 0], [0, 1], [0, 0]]), + np.diag([0.1, 0.1, 0.1, 0.1]), + np.array([[1.0]]), + 2, + [0, None, 1, None], + ), + ], + ids=["1D_meas:2D_state", "2D_meas:4D_state", "4D_meas:2D_state"] +) +def test_glmodel(H, R, B, ndim_state, mapping): + """ + test for the generalised linear model + """ + + state_vec = np.array([[n] for n in range(ndim_state)]) + state = State(state_vec) + + glm = GeneralLinearGaussian(ndim_state=ndim_state, + mapping=mapping, + meas_matrix=H, + bias_value=B, + noise_covar=R) + + # Ensure ```lg.transfer_function()``` returns H + assert np.array_equal(H, glm.matrix()) + + # Ensure lg.jacobian() returns H + assert np.array_equal(H, glm.jacobian(state=state)) + + # Ensure ```lg.covar()``` returns R + assert np.array_equal(R, glm.covar()) + + # Ensure the Bias is returned + assert np.array_equal(B, glm.bias()) + + # Project a state through the model + # (without noise) + meas_pred_wo_noise = glm.function(state) + assert np.array_equal(meas_pred_wo_noise, H @ state_vec + B) + + # Evaluate the likelihood of the predicted measurement, given the state + # (without noise) + prob = glm.pdf(State(meas_pred_wo_noise), state) + assert approx(prob) == multivariate_normal.pdf( + meas_pred_wo_noise.T, + mean=np.array(H @ state_vec + B).ravel(), + cov=R) + + # Propagate a state vector through the model + # (with external noise) + noise = glm.rvs() + meas_pred_w_enoise = glm.function(state, + noise=noise) + assert np.array_equal(meas_pred_w_enoise, H @ state_vec + B + noise) + + # Evaluate the likelihood of the predicted state, given the prior + # (with noise) + prob = glm.pdf(State(meas_pred_w_enoise), state) + assert approx(prob) == multivariate_normal.pdf( + meas_pred_w_enoise.T, + mean=np.array(H @ state_vec + B).ravel(), + cov=R) + + # Test random seed give consistent results + glm1 = GeneralLinearGaussian(ndim_state=ndim_state, + mapping=mapping, + meas_matrix=H, + bias_value=B, + noise_covar=R, + seed=1 + ) + glm2 = GeneralLinearGaussian(ndim_state=ndim_state, + mapping=mapping, + meas_matrix=H, + bias_value=B, + noise_covar=R, + seed=1 + ) + + # Check first values produced by seed match + for _ in range(3): + assert all(glm1.rvs() == glm2.rvs()) diff --git a/stonesoup/models/transition/linear.py b/stonesoup/models/transition/linear.py index 5c99b7977..a6a213692 100644 --- a/stonesoup/models/transition/linear.py +++ b/stonesoup/models/transition/linear.py @@ -137,8 +137,8 @@ def covar(self, time_interval, **kwargs): dt = time_interval_sec N = self.constant_derivative if N == 1: - covar = np.array([[dt**3 / 3, dt**2 / 2], - [dt**2 / 2, dt]]) + covar = np.array([[dt ** 3 / 3, dt ** 2 / 2], + [dt ** 2 / 2, dt]]) else: Fmat = self.matrix(time_interval, **kwargs) Q = np.zeros((N + 1, N + 1)) @@ -147,7 +147,7 @@ def covar(self, time_interval, **kwargs): covar = np.zeros((N + 1, N + 1)) for l in range(0, N + 1): # noqa: E741 for k in range(0, N + 1): - covar[l, k] = (igrand[l, k]*dt / (1 + N*2 - l - k)) + covar[l, k] = (igrand[l, k] * dt / (1 + N * 2 - l - k)) covar *= self.noise_diff_coeff return CovarianceMatrix(covar) @@ -316,7 +316,7 @@ def _continoustransitionmatrix(t, N, K): for i in range(0, N + 1): FCont[i, N] = np.exp(-K * t) * (-1) ** (N - i) / K ** (N - i) for n in range(1, N - i + 1): - FCont[i, N] -= (-1) ** n * t ** (N - i - n) /\ + FCont[i, N] -= (-1) ** n * t ** (N - i - n) / \ (math.factorial(N - i - n) * K ** n) for j in range(i, N): FCont[i, j] = (t ** (j - i)) / math.factorial(j - i) @@ -490,6 +490,7 @@ class SingerApproximate(Singer): @property def decay_derivative(self): return 2 + r"""This is a class implementation of a discrete, time-variant 1D Singer Transition Model, with covariance approximation applicable for smaller time intervals. @@ -542,6 +543,7 @@ def decay_derivative(self): \frac{dt^3}{6} & \frac{dt^2}{2} & dt \end{bmatrix} """ + def covar(self, time_interval, **kwargs): """Returns the transition model noise covariance matrix. @@ -562,14 +564,14 @@ def covar(self, time_interval, **kwargs): # Only leading terms get calculated for speed. covar = np.array( - [[time_interval_sec**5 / 20, - time_interval_sec**4 / 8, - time_interval_sec**3 / 6], - [time_interval_sec**4 / 8, - time_interval_sec**3 / 3, - time_interval_sec**2 / 2], - [time_interval_sec**3 / 6, - time_interval_sec**2 / 2, + [[time_interval_sec ** 5 / 20, + time_interval_sec ** 4 / 8, + time_interval_sec ** 3 / 6], + [time_interval_sec ** 4 / 8, + time_interval_sec ** 3 / 3, + time_interval_sec ** 2 / 2], + [time_interval_sec ** 3 / 6, + time_interval_sec ** 2 / 2, time_interval_sec]] ) * self.noise_diff_coeff @@ -606,7 +608,7 @@ def ndim_state(self): : :class:`int` The number of combined model state dimensions. """ - return sum(model.ndim_state for model in self.model_list)+4 + return sum(model.ndim_state for model in self.model_list) + 4 def matrix(self, time_interval, **kwargs): """Model matrix :math:`F` @@ -622,15 +624,15 @@ def matrix(self, time_interval, **kwargs): transition_matrices = [ model.matrix(time_interval) for model in self.model_list] sandwich = block_diag(z, *transition_matrices, z) - sandwich[0:2, 0:2] = np.array([[1, np.sin(turn_ratedt)/self.turn_rate], - [0, np.cos(turn_ratedt)]]) + sandwich[0:2, 0:2] = np.array([[1, np.sin(turn_ratedt) / self.turn_rate], + [0, np.cos(turn_ratedt)]]) sandwich[0:2, -2:] = np.array( - [[0, (np.cos(turn_ratedt)-1)/self.turn_rate], + [[0, (np.cos(turn_ratedt) - 1) / self.turn_rate], [0, -np.sin(turn_ratedt)]]) sandwich[-2:, 0:2] = np.array( - [[0, (1-np.cos(turn_ratedt))/self.turn_rate], + [[0, (1 - np.cos(turn_ratedt)) / self.turn_rate], [0, np.sin(turn_ratedt)]]) - sandwich[-2:, -2:] = np.array([[1, np.sin(turn_ratedt)/self.turn_rate], + sandwich[-2:, -2:] = np.array([[1, np.sin(turn_ratedt) / self.turn_rate], [0, np.cos(turn_ratedt)]]) return sandwich @@ -646,10 +648,10 @@ def covar(self, time_interval, **kwargs): q1, q2 = self.turn_noise_diff_coeffs dt = time_interval.total_seconds() covar_list = [model.covar(time_interval) for model in self.model_list] - ctc1 = np.array([[q1*dt**3/3, q1*dt**2/2], - [q1*dt**2/2, q1*dt]]) - ctc2 = np.array([[q1*dt**3/3, q1*dt**2/2], - [q1*dt**2/2, q1*dt]]) + ctc1 = np.array([[q1 * dt ** 3 / 3, q1 * dt ** 2 / 2], + [q1 * dt ** 2 / 2, q1 * dt]]) + ctc2 = np.array([[q1 * dt ** 3 / 3, q1 * dt ** 2 / 2], + [q1 * dt ** 2 / 2, q1 * dt]]) return CovarianceMatrix(block_diag(ctc1, *covar_list, ctc2)) diff --git a/stonesoup/types/prediction.py b/stonesoup/types/prediction.py index 02bd7b8fc..44fb93e26 100644 --- a/stonesoup/types/prediction.py +++ b/stonesoup/types/prediction.py @@ -1,6 +1,7 @@ import copy import datetime from typing import Sequence +from typing import MutableMapping from .array import CovarianceMatrix from .base import Type @@ -12,6 +13,7 @@ from ..base import Property from ..models.transition.base import TransitionModel from ..types.state import CreatableFromState, CompositeState +from ..models.measurement import MeasurementModel class Prediction(Type, CreatableFromState): @@ -124,6 +126,36 @@ def __init__(self, *args, **kwargs): "columns as the number of rows in state_vector") +class AugmentedMeasurementPrediction(Type, CreatableFromState): + """ Augmented Prediction type that permits measurement_model and metadata to be included. + + This is the base measurement prediction class. """ + + measurement_model: MeasurementModel = Property( + default=None, + doc="The measurement model used to generate the prediction (the default is ``None``)") + + metadata: MutableMapping = Property( + default=None, doc='Dictionary of metadata items for AugmentedMeasurementPrediction.') + + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + if self.metadata is None: + self.metadata = {} + + +class AugmentedGaussianMeasurementPrediction(AugmentedMeasurementPrediction, GaussianState): + """ AugmentedGaussianMeasurementPrediction type + + This is an extended alternative to a GaussianMeasurementPrediction class, + which also preserves the model used to + generate the prediction, and allows for metadata to be included. + """ + + cross_covar: CovarianceMatrix = Property( + default=None, doc="The state-measurement cross covariance matrix") + + # Don't need to support Sqrt Covar for MeasurementPrediction CreatableFromState.class_mapping[MeasurementPrediction][SqrtGaussianState] = \ GaussianMeasurementPrediction diff --git a/stonesoup/updater/iterated.py b/stonesoup/updater/iterated.py index 19af638ee..13ab08f52 100644 --- a/stonesoup/updater/iterated.py +++ b/stonesoup/updater/iterated.py @@ -1,5 +1,6 @@ import copy import warnings +from typing import Callable from . import Updater from ..base import Property @@ -7,12 +8,15 @@ from ..models.measurement import MeasurementModel from ..models.transition import TransitionModel from ..predictor import Predictor -from .kalman import ExtendedKalmanUpdater +from .kalman import KalmanUpdater, ExtendedKalmanUpdater, UnscentedKalmanUpdater from ..predictor.kalman import ExtendedKalmanPredictor from ..smoother import Smoother from ..smoother.kalman import ExtendedKalmanSmoother -from ..types.prediction import Prediction +from ..types.prediction import Prediction, AugmentedGaussianMeasurementPrediction from ..types.track import Track +from ..functions import slr_definition +from ..models.measurement.linear import GeneralLinearGaussian +from functools import partial class DynamicallyIteratedUpdater(Updater): @@ -137,3 +141,116 @@ def __init__(self, *args, **kwargs): ExtendedKalmanUpdater.register(DynamicallyIteratedEKFUpdater) + + +class IPLFKalmanUpdater(UnscentedKalmanUpdater): + """ + This implements the updater of the Iterated Posterior Linearization FIlter + (IPLF) described in [1]. It is obtained by iteratively linearising the + measurement function using statistical linear regression (SLR) with respect + to the posterior (rather than the prior), to take into account the information + provided by the measurement. Nevertheless, on the first iteration linearisation + is performed with respect to the prior, so the output is equivalent to that of + a standard :class:`~.UnscentedKalmanUpdater`. This class inherits most of the + functionality from :class:`~.UnscentedKalmanUpdater`. + + References + ---------- + [1] Á. F. García-Fernández, L. Svensson, M. R. Morelande and S. Särkkä, + "Posterior Linearization Filter: Principles and Implementation Using Sigma Points," + in IEEE Transactions on Signal Processing, vol. 63, no. 20, pp. 5561-5573, + Oct.15, 2015, doi: 10.1109/TSP.2015.2454485. + """ + + tolerance: float = Property( + default=1e-1, + doc="The value used as a convergence criterion.") + measure: Measure = Property( + default=KLDivergence(), + doc="The measure to use to test the convergence. Defaults to the " + "GaussianKullbackLeiblerDivergence between previous and current posteriors.") + max_iterations: int = Property( + default=5, + doc="The maximum number of iterations. " + "Setting `max_iterations=1` is equivalent to using UKF.") + slr_func: Callable = Property(default=slr_definition, + doc="Function to compute " + "the SLR parameters.") + + def update(self, hypothesis, keep_linearisation=True, + force_symmetric_covariance=True, **kwargs): + + # Get the nonlinear measurement model and its parameters + measurement_model = self._check_measurement_model(hypothesis.measurement.measurement_model) + meas_func = partial(self.predict_measurement, + measurement_model=measurement_model, + measurement_noise=False) + ndim_state = measurement_model.ndim_state + r_cov_matrix = measurement_model.covar() + + # Initial approximation for the posterior + post_state = hypothesis.prediction + + for i in range(self.max_iterations): + + # Preserve the previous approximation for convergence evaluation + prev_post_state = post_state + + # Compute the parameters of Statistical Linear Regression (SLR) + h_matrix, b_vector, omega_cov_matrix \ + = self.slr_func(post_state, meas_func, + force_symmetric_covariance=force_symmetric_covariance) + slr_parameters = { + 'h_matrix': h_matrix, + 'b_vector': b_vector, + 'omega_cov_matrix': omega_cov_matrix + } + + # Create a linear measurement model using the SLR parameters + measurement_model_linear = GeneralLinearGaussian( + ndim_state=ndim_state, + meas_matrix=slr_parameters['h_matrix'], + bias_value=slr_parameters['b_vector'], + noise_covar=slr_parameters['omega_cov_matrix']+r_cov_matrix) + + # Predict the measurement using the linearised model + measurement_prediction_linear = KalmanUpdater.predict_measurement( + self, + predicted_state=hypothesis.prediction, + measurement_model=measurement_model_linear + ) + + if keep_linearisation: + + # Store the SLR parameters + metadata = { + 'slr_parameters': slr_parameters, + 'r_cov_matrix': r_cov_matrix, + 'iteration': i, + 'max_iterations': self.max_iterations, + 'tolerance': self.tolerance + } + + # Wrap the prediction into a custom class + # that preserves the linearised model and metadata + measurement_prediction_linear = AugmentedGaussianMeasurementPrediction( + state_vector=measurement_prediction_linear.state_vector, + covar=measurement_prediction_linear.covar, + timestamp=measurement_prediction_linear.timestamp, + cross_covar=measurement_prediction_linear.cross_covar, + measurement_model=measurement_model_linear, + metadata=metadata + ) + + # Perform a linear update using the predicted measurement + hypothesis.measurement_prediction = measurement_prediction_linear + post_state = KalmanUpdater.update(self, hypothesis, **kwargs) + + if force_symmetric_covariance: + post_state.covar = (post_state.covar + post_state.covar.T) / 2 + + # KLD between the previous and current posteriors to measure convergence + if self.measure(prev_post_state, post_state) < self.tolerance: + break + + return post_state diff --git a/stonesoup/updater/tests/test_iterated.py b/stonesoup/updater/tests/test_iterated.py index 625a813ad..55cf8a0cf 100644 --- a/stonesoup/updater/tests/test_iterated.py +++ b/stonesoup/updater/tests/test_iterated.py @@ -4,7 +4,7 @@ from stonesoup.models.measurement.nonlinear import CartesianToBearingRange from stonesoup.models.transition.nonlinear import ConstantTurn -from stonesoup.predictor.kalman import ExtendedKalmanPredictor +from stonesoup.predictor.kalman import ExtendedKalmanPredictor, UnscentedKalmanPredictor from stonesoup.smoother.kalman import ExtendedKalmanSmoother from stonesoup.types.angle import Bearing from stonesoup.types.array import StateVector, CovarianceMatrix @@ -12,7 +12,7 @@ from stonesoup.types.hypothesis import SingleHypothesis from stonesoup.types.prediction import GaussianStatePrediction from stonesoup.types.state import GaussianState -from stonesoup.updater.iterated import DynamicallyIteratedUpdater +from stonesoup.updater.iterated import DynamicallyIteratedUpdater, IPLFKalmanUpdater from stonesoup.updater.kalman import ExtendedKalmanUpdater @@ -113,3 +113,98 @@ def test_diekf(): [-0.003, -0.043, 0.730, 0.548, 0.049], [-0.010, -0.037, 0.022, 0.049, 0.078]]), atol=1e-3) + + +def test_IPLF(): + + time1 = datetime.datetime.now() + time2 = time1 + datetime.timedelta(seconds=1) + time3 = time2 + datetime.timedelta(seconds=1) + + transition_model = ConstantTurn([0.05, 0.05], np.radians(2)) + + sensor_x = 50 # Placing the sensor off-centre + sensor_y = 0 + + measurement_model = CartesianToBearingRange( + ndim_state=5, + mapping=(0, 2), + noise_covar=np.diag([np.radians(0.2), 1]), # Covariance matrix. 0.2 degree variance in + # bearing and 1 metre in range + translation_offset=np.array([[sensor_x], [sensor_y]]) # Offset measurements to location of + # sensor in cartesian. + ) + + prior = GaussianState( + [[0.], [1.], [0.], [1.], [0.]], + np.diag([1.5, 0.5, 1.5, 0.5, np.radians(0.5)]), + timestamp=time1) + + prediction = GaussianStatePrediction( + state_vector=StateVector([[1.], [1.], [1.], [1.], [0.]]), + covar=CovarianceMatrix([[1.5, 0., 0., 0., 0.], + [0., 0.5, 0., 0., 0.], + [0., 0., 1.5, 0., 0.], + [0., 0., 0., 0.5, 0.], + [0., 0., 0., 0., np.radians(0.5)]]), + transition_model=transition_model, + timestamp=time2, + prior=prior) + + measurement = Detection(state_vector=StateVector([[Bearing(-3.1217136817127424)], + [47.7876225398533]]), + timestamp=time2, + measurement_model=measurement_model) + + hypothesis = SingleHypothesis(prediction=prediction, measurement=measurement) + + sub_predictor = UnscentedKalmanPredictor(transition_model) + + updater = IPLFKalmanUpdater() + + updated_state = updater.update(hypothesis=hypothesis) + + # Check timestamp is same as provided in measurement + assert updated_state.timestamp == time2 + + # Check prediction and measurement are unchanged + assert updated_state.hypothesis.prediction == prediction + assert updated_state.hypothesis.measurement == measurement + + # Check state vector is correct + assert np.allclose( + updated_state.state_vector, + StateVector([[1.736], [1.000], [0.6860], [1.000], [0.]]), + atol=1e-3) + + # Check covariance matrix is correct + assert np.allclose( + updated_state.covar, + CovarianceMatrix( + [[0.6002, -0.0, 0.009, -0.0, -0.0], + [-0.0, 0.500, 0.0, -0.0, -0.0], + [0.009, 0.0, 1.266, 0.0, -0.0], + [-0.0, -0.0, -0.0, 0.500, -0.0], + [-0.0, -0.0, -0.0, -0.0, 0.008]]), + atol=1e-3) + + prediction = sub_predictor.predict(updated_state, time3) + measurement = Detection(state_vector=StateVector([[Bearing(3.133)], [47.777]]), + timestamp=time3, + measurement_model=measurement_model) + hypothesis = SingleHypothesis(prediction=prediction, measurement=measurement) + updated_state = updater.update(hypothesis=hypothesis) + + assert np.allclose( + updated_state.state_vector, + StateVector([[2.481], [0.876], [1.457], [0.929], [0.0004]]), + atol=1e-3) + assert np.allclose( + updated_state.covar, + CovarianceMatrix( + [[0.528, 0.250, 0.023, 0.003, -0.002], + [0.250, 0.426, 0.004, -0.004, -0.007], + [0.023, 0.004, 1.455, 0.431, 0.003], + [0.003, -0.004, 0.431, 0.529, 0.008], + [-0.002, -0.007, 0.0034, 0.008, 0.043]]), + atol=1e-3)