Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
178 changes: 178 additions & 0 deletions stonesoup/GaussianProcess/Cubic function.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,178 @@
# -*- coding: utf-8 -*-
"""
Created on Wed Feb 21 05:00:02 2024

@author: 007
"""
import matplotlib.pyplot as plt
from datetime import timedelta
from datetime import datetime
import numpy as np
from sklearn.metrics import mean_squared_error
from stonesoup.types.groundtruth import GroundTruthPath, GroundTruthState
from stonesoup.plotter import Plotterly
rmse_x_list = []
rmse_y_list = []
rmsed_x_list = []
rmsed_y_list = []
seed = 233
num_sensors = 100
for experiment in range(1):
plotter = Plotterly()

truth = GroundTruthPath()
start_time = datetime.now()
for n in range(1, 202, 2):
x = n - 100
y = 1e-4 * (n-100)**3
varxy = np.array([[0.1, 0.], [0., 0.1]])
xy = np.random.multivariate_normal(np.array([x, y]), varxy)
truth.append(GroundTruthState(np.array([[xy[0]], [xy[1]]]),
timestamp=start_time +
timedelta(seconds=n)))

# Plot the result
plotter.plot_ground_truths({truth}, [0, 1])
plotter.fig

from scipy.stats import multivariate_normal
from stonesoup.types.detection import Detection
from stonesoup.models.measurement.linear import LinearGaussian

measurements = []
for state in truth:
x, y = multivariate_normal.rvs(
state.state_vector.ravel(), cov=np.diag([10., 10.]))
measurements.append(Detection(
[x, y], timestamp=state.timestamp))

# Plot the result
plotter.plot_measurements(measurements, [0, 1],
LinearGaussian(2, (0, 1), np.diag([0, 0])))
plotter.fig

from GP_Tracking import GP_track
num_steps = 100

Xm = []
Ym = []
for k in range(num_steps-1):
truth_x = [state.state_vector[0] for state in truth]
truth_y = [state.state_vector[1] for state in truth]

for measurement in measurements:
Xm.append(measurement.state_vector[0])
# Xt.append(measurement.timestamp)
Ym.append(measurement.state_vector[1])
# Yt.append(measurement.timestamp)
Xm = np.array(Xm).reshape(-1, 1)
Ym = np.array(Ym).reshape(-1, 1)

A = GP_track()
Size_win = 100 # sliding Window
x_data, x_cov, y_data, y_cov = A.tracking(measurements, Size_win)

fig = plt.figure(figsize=(10, 6))
ax2 = fig.add_subplot(1, 1, 1)
ax2.plot(Xm, Ym, 'xb', label='Measurements', markerfacecolor='None')
ax2.plot(truth_x, truth_y, 'd-k', label='Truth', markerfacecolor='None')
ax2.plot(x_data, y_data, 'r.', label='Estimates')
# plt.xlim(200, 200)
# plt.ylim(-100, 100)
ax2.set_xlabel('X (m)', fontsize=15)
ax2.set_ylabel('Y (m)', fontsize=15)
# ax2.set_title('GP Tracking Estimates', fontsize=20)
ax2.legend(fontsize=15)
truth_x = np.array(truth_x).reshape(-1, 1)
truth_y = np.array(truth_y).reshape(-1, 1)
rmse_x = mean_squared_error(truth_x[2:], x_data, squared=False)
rmse_y = mean_squared_error(truth_y[2:], y_data, squared=False)
print('GPX =', rmse_x)
print('GPY =', rmse_y)
rmse_x_list.append(rmse_x)
rmse_y_list.append(rmse_y)

from sensor_network import GP_Sensor
# Generate sensors and record data

SW = GP_Sensor()



min_distance = 15
range_value = 30
xrange = (-200, 200)
yrange = (-100, 100)

sensor_data = SW.create_sensor_network_plot(
num_sensors, range_value, min_distance, xrange, yrange, seed)
time_data1, x_data1, y_data1 = SW.track_targetDGP(Xm, Ym, sensor_data)

x_data, x_cov, y_data, y_cov, x_s, y_s, t_s = A.tracking_DGP(
measurements, time_data1, x_data1, y_data1)

import matplotlib.patches as mpatches
fig, ax2 = plt.subplots(figsize=(10, 6))
sensor_range = range_value
ax2.plot(Xm, Ym, 'xb', label='Measurements', markerfacecolor='None')
ax2.plot(truth_x, truth_y, 'd-k', label='Truth', markerfacecolor='None')
ax2.plot(x_data, y_data, 'r.', label='DGP Estimates')

# Create custom legend entries
sensor_legend = mpatches.Patch(color='orange', label='Sensor')
range_legend = mpatches.Patch(color='blue', alpha=0.1,
label='Sensor Range')

# Plot each sensor's position and range
for sensor in sensor_data:
position = sensor["position"]
ax2.scatter(position[0], position[1], c='orange', edgecolor='black')
sensor_circle = plt.Circle((
position[0], position[1]),
sensor_range, color='blue', alpha=0.8, fill=False)
ax2.add_patch(sensor_circle)

# Only label the first sensor and first range to avoid duplicate labels
ax2.scatter([], [], c='orange', label='Sensor',
edgecolor='black') # Invisible point for legend

ax2.add_patch(mpatches.Circle((0, 0), 1, edgecolor='blue',
facecolor='none', alpha=0.8,
label='Detection Range'))
plt.xlim(-120, 105)
plt.ylim(-120, 120)
ax2.set_xlabel('X (m)', fontsize=15)
ax2.set_ylabel('Y (m)', fontsize=15)

# Extract existing handles and labels
handles, labels = ax2.get_legend_handles_labels()

# Add the custom legend entries
handles.extend([sensor_legend, range_legend])

# Create the legend with the custom and existing entries
ax2.legend(handles=handles, labels=labels, fontsize=15)

# ax2.set_title('Sensor Positions and DGP Estimates', fontsize=20)
rmse_x = mean_squared_error(truth_x[3:], x_data, squared=False)
rmse_y = mean_squared_error(truth_y[3:], y_data, squared=False)
print('DGPX =', rmse_x)
print('DGPY =', rmse_y)
rmsed_x_list.append(rmse_x)
rmsed_y_list.append(rmse_y)

def compute_sensor_coverage(truth, sensor_data, range_value):
total_points = len(truth)
covered_points = 0
for state in truth:
x, y = state.state_vector.ravel()
for sensor in sensor_data:
sensor_x, sensor_y = sensor["position"]
distance = np.sqrt((x - sensor_x)**2 + (y - sensor_y)**2)
if distance <= range_value:
covered_points += 1
break
return (covered_points / total_points) * 100

coverage = compute_sensor_coverage(truth, sensor_data, range_value)
print(f"cover: {coverage:.2f}%")
148 changes: 148 additions & 0 deletions stonesoup/GaussianProcess/GP.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,148 @@
import numpy as np
import matplotlib.pyplot as plt
from numpy.linalg import inv
from numpy.linalg import cholesky
from scipy.linalg import solve_triangular
from scipy.optimize import minimize
from GP_kernel import KernelFunctions


class GaussianProcess:
def __init__(self, kernel_type='SE'):
self.kernel_obj = KernelFunctions()
self.kernel_type = kernel_type

def kernel(self, X1, X2, *args, **kwargs):
if self.kernel_type == 'SE':
return self.kernel_obj.SE_kernel(X1, X2, *args, **kwargs)
elif self.kernel_type == 'ARD':
return self.kernel_obj.ARD_kernel(X1, X2, *args, **kwargs)
elif self.kernel_type == 'Linear':
return self.kernel_obj.linear_kernel(X1, X2, *args, **kwargs)
elif self.kernel_type == 'Polynomial':
return self.kernel_obj.polynomial_kernel(X1, X2, *args, **kwargs)
elif self.kernel_type == 'Matern':
return self.kernel_obj.matern_kernel(X1, X2, *args, **kwargs)
elif self.kernel_type == 'Periodic':
return self.kernel_obj.periodic_kernel(X1, X2, *args, **kwargs)
else:
raise ValueError("Unknown kernel type")

def posterior(
self, X_s, X_train, Y_train, length_scale=1.0, sigma_f=0.10,
sigma_y=1e-8
):

K = self.kernel(
X_train, X_train, length_scale=length_scale, sigma_f=sigma_f)
K += sigma_y**2 * np.eye(len(X_train))
K_s = self.kernel(
X_train, X_s, length_scale=length_scale, sigma_f=sigma_f)
K_ss = self.kernel(
X_s, X_s, length_scale=length_scale, sigma_f=sigma_f)
K_ss + 1e-5 * np.eye(len(X_s))
K_inv = inv(K)

mu_s = K_s.T.dot(K_inv).dot(Y_train)
cov_s = K_ss - K_s.T.dot(K_inv).dot(K_s)

return mu_s, cov_s

def distributed_posterior(
self, X_s, X_train, Y_train, length_scale=1.0, sigma_f=0.10,
sigma_y=1e-8
):
mu_s = [None] * len(X_train)
cov_s = [None] * len(X_train)

for i in range(len(X_train)):
K = self.kernel(
X_train[i], X_train[i],
length_scale=length_scale, sigma_f=sigma_f)
K += sigma_y**2 * np.eye(len(X_train[i]))
K_s = self.kernel(X_train[i], X_s, length_scale, sigma_f)
K_ss = self.kernel(X_s, X_s, length_scale, sigma_f)
K_ss += 1e-2 * np.eye(len(X_s))
K_inv = inv(K)
Y_train1 = np.array(Y_train[i]).ravel()

mu_s[i] = K_s.T.dot(K_inv).dot(Y_train1).reshape(-1, 1)
cov_s[i] = K_ss - K_s.T.dot(K_inv).dot(K_s)

return mu_s, cov_s

def plot_gp(self, mu, cov, Xt, X_train=None, Y_train=None, samples=[]):
Xt = Xt.ravel()
mu = mu.ravel()
uncertainty = 1.96 * np.sqrt(np.diag(cov))

plt.fill_between(Xt, mu + uncertainty, mu - uncertainty, alpha=0.1)
plt.plot(Xt, mu, label='Mean')
for i, sample in enumerate(samples):
plt.plot(Xt, sample, lw=1, ls='--', label=f'Sample {i+1}')
if X_train is not None:
plt.plot(X_train, Y_train, 'rx')
plt.legend()

def nll_fn(self, X_train, Y_train, flag):
if flag == 'DGP':
def nll_distributed(theta):
object_fun = 0
for i in range(len(X_train)):
Y_train1 = np.array(Y_train[i]).ravel()
K = self.kernel(
X_train[i], X_train[i], length_scale=theta[0],
sigma_f=theta[1]) + theta[2]**2 * np.eye(
len(X_train[i]))
K += np.eye(K.shape[0]) * 1e-1
L = cholesky(K)

S1 = solve_triangular(L, Y_train1, lower=True)
S2 = solve_triangular(L.T, S1, lower=False)

fun = np.sum(np.log(np.diagonal(L)))
funa = 0.5 * Y_train1.dot(S2)
func = 0.5 * len(X_train[i]) * np.log(2 * np.pi)
object_fun += fun + funa + func
return object_fun

return nll_distributed
elif flag == 'GP':
def nll_stable(theta):
Y_train1 = np.array(Y_train).ravel()
K = self.kernel(
X_train, X_train, length_scale=theta[0],
sigma_f=theta[1]
)
K += theta[2]**2 * np.eye(len(X_train))
K += np.eye(K.shape[0]) * 1e-1
L = cholesky(K)

S1 = solve_triangular(L, Y_train1, lower=True)
S2 = solve_triangular(L.T, S1, lower=False)
fun = np.sum(np.log(np.diagonal(L)))
funa = 0.5 * Y_train1.dot(S2)
funb = 0.5 * len(X_train) * np.log(2 * np.pi)
fun += funa + funb
return fun

return nll_stable

def fit(self, X_train, Y_train, flag):
return minimize(self.nll_fn(X_train, Y_train, flag), [5, 5, 0.1],
bounds=((1e-5, None), (1e-5, None), (1e-5, None)),
method='L-BFGS-B')

def aggregation(self, mu_set, cov_set, X_t):
s2 = np.zeros((len(X_t), 1))
mu1 = s2

for i in range(len(mu_set)):
for j in range(len(mu_set[i])):
s2 += 1 / cov_set[i][j, j]
s2 = 1 / s2
for i in range(len(mu_set)):
for j in range(len(mu_set[i])):
mu1 += s2 * (mu_set[i] / cov_set[i][j, j])

return mu1, s2
Loading