forked from lboloni/WaterberryFarms
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathInformationModel.py
312 lines (259 loc) · 14 KB
/
InformationModel.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
import math
import itertools
import random
import logging
from functools import partial
import numpy as np
from scipy import signal
from sklearn.gaussian_process import GaussianProcessRegressor
from sklearn.gaussian_process.kernels import RBF, WhiteKernel
from sklearn.metrics import mean_squared_error
import matplotlib.pyplot as plt
from matplotlib import animation, rc
import unittest
import timeit
# from IPython.display import display, HTML
from Environment import Environment, DissipationModelEnvironment, EpidemicSpreadEnvironment
logging.basicConfig(level=logging.WARNING)
class InformationModel:
"""The ancestor of all information models. This defines the functions we can
call on these. They will need to be specialized to the different information models"""
def __init__(self, width, height):
self.width, self.height = width, height
# def score(self, env: Environment):
# """Calculates a score that estimates the quality of this information
# model in modeling the specified environment"""
# return 0
def add_observation(self, observation: dict):
"""Adds an observation as a dictionary with the fields value, x, y,
timestamp etc. Different implementations do different things with these
observations (store, use it right away to update the model etc.)"""
pass
def proceed(self, delta_t: float):
"""Proceed with the information model. The general assumption here is
that after calling this the estimates are pre-computed and ready to be
queried. Some implementations might model the evolution of the system
as well."""
pass
class StoredObservationIM(InformationModel):
"""An information model that receives a series of
observations. Observations are dictionaries with the specific values as specified in the constants below."""
VALUE = "value"
X = "x"
Y = "y"
LOCATION = "location"
TIME = "time"
CONFIDENCE = "confidence"
RANGE = "range"
def __init__(self, width, height):
super().__init__(width, height)
self.observations = []
def add_observation(self, observation):
"""The simplest way to add an observation is that we just record it."""
self.observations.append(observation)
class AbstractScalarFieldIM(StoredObservationIM):
"""An abstract information model for scalar fields that keeps for each point the value and an uncertainty metric. The uncertainty metric is an estimate of the error at any given location.
"""
def __init__(self, width, height):
"""Initializes the value to zero, the uncertainty to one.
FIXME: what exactly the uncertainty measures??? """
super().__init__(width, height)
self.value = np.zeros((self.width, self.height))
self.uncertainty = np.ones((self.width, self.height))
def proceed(self, delta_t):
"""Proceeds a step in time. At the current point, this basically performs an estimation, based on the observations.
None of the currently used estimators have the ability to make predictions, but this might be the case later on"""
self.value, self.uncertainty = self.estimate(self.observations, None, None)
def estimate(self, observations, prior_value: None, prior_uncertainty: None):
"""Performs the estimate for every point in the environment. Returns a the posterior values and uncertainty as arrays for every point in the environment.
The observations are the ones that have not been integrated into the prior"""
raise Exception(f"Trying to call estimate in the abstract class.")
def estimate_voi(self, observation):
"""The voi of the observation is the reduction of the uncertainty.
FIXME this can be made different for the GP"""
_, uncertainty = self.estimate(self.observations)
observations_new = self.observations.copy()
observations_new.append(observation)
_, uncertainty_new = self.estimate(observations_new)
return np.sum(np.abs(uncertainty - uncertainty_new))
class GaussianProcessScalarFieldIM(AbstractScalarFieldIM):
"""An information model for scalar fields where the estimation is happening
using a GaussianProcess
"""
def __init__(self, width, height, gp_kernel = None):
super().__init__(width, height)
self.gp_kernel = gp_kernel
def estimate(self, observations, prior_value, prior_uncertainty):
# calculate the estimate for each gaussian process
if prior_value != None or prior_uncertainty != None:
Exception("GaussianProcessScalarFieldIM cannot handle priors")
est = np.zeros([self.width,self.height])
stdmap = np.zeros([self.width,self.height])
if len(observations) == 0:
return est, stdmap
X = []
Y = []
for obs in observations:
# Unclear if this rounding maters matters???
X.append([round(obs[self.X]), round(obs[self.Y])])
Y.append([obs[self.VALUE]])
# fit the gaussian process
kernel = RBF(length_scale = [2.0, 2.0], length_scale_bounds = [1, 10]) + WhiteKernel(noise_level=0.5)
# rbf = RBF(length_scale = [2.0, 2.0], length_scale_bounds = "fixed")
gpr = GaussianProcessRegressor(kernel=kernel, n_restarts_optimizer=5)
gpr.fit(X,Y)
x = []
X = np.array(list(itertools.product(range(self.width), range(self.height))))
Y, std = gpr.predict(X, return_std = True)
for i, idx in enumerate(X):
est[idx[0], idx[1]] = Y[i]
stdmap[idx[0], idx[1]] = std[i]
# print(std.sum())
return est, stdmap
class PointEstimateScalarFieldIM(AbstractScalarFieldIM):
"""An information model which performs a point based estimation. In the precise point where we have an estimate, out uncertainty is zero, while everywhere else the uncertainty is 1.00
"""
def __init__(self, width, height):
super().__init__(width, height)
def estimate(self, observations, prior_value, prior_uncertainty):
"""Takes all the observations and estimates the value and the
uncertainty. This one processes all the observations, ignores the
timestamp, and assumes that each observation refers only to the current
point."""
# value = np.ones((self.width, self.height)) * 0.5
if prior_value != None:
value = np.clone(prior_value)
else:
value = np.zeros((self.width, self.height))
if prior_uncertainty != None:
uncertainty = np.clone(prior_uncertainty)
else:
uncertainty = np.ones((self.width, self.height))
for obs in observations:
value[obs[self.X], obs[self.Y]] = obs[self.VALUE]
uncertainty[obs[self.X], obs[self.Y]] = 0
return value, uncertainty
class DiskEstimateScalarFieldIM(AbstractScalarFieldIM):
"""An information model which performs a disk based estimation.
"""
def __init__(self, width, height, disk_radius=5, default_value=0):
super().__init__(width, height)
self.disk_radius = disk_radius
self.default_value = default_value
self.mask = None
def estimate(self, observations, prior_value, prior_uncertainty, radius=None):
"""Consider that we are estimating them with a disk of a certain radius
r. The radius r can be dynamically calculated such that the total disks achieve 2x the coverage of the area. sqrt((height * width * 2) / pi).
Later disks overwrite earlier disks.
FIXME: Areas that have no coverage have uncertainty 1, while areas that fit into a disk have an uncertainty 0."""
value = np.full((self.width, self.height), self.default_value, dtype=np.float64)
uncertainty = np.ones((self.width, self.height), dtype=np.float64)
if self.disk_radius == None:
radius = int(1+math.sqrt((self.height * self.width * 2) / (math.pi * len(observations))))
else:
radius = self.disk_radius
# create a mask array
self.maskdim = 2 * radius + 1
self.mask = np.full((self.maskdim, self.maskdim), False, dtype=bool)
for i in range(-radius, radius):
for j in range(-radius, radius):
if (math.sqrt((i*i + j*j)) <= radius):
self.mask[i + radius, j + radius] = True
for obs in observations:
#self.apply_value_iterate(value, obs[self.X], obs[self.Y], obs[self.VALUE], radius)
self.apply_value_mask(value, uncertainty, obs[self.X], obs[self.Y], obs[self.VALUE])
return value, uncertainty
def apply_value_mask(self, value, uncertainty, x, y, new_value):
"""Applies the value using a mask based approach"""
# logging.info("apply_value_mask started")
# create a true/false mask of the size of the environment for the application of the values. This is based on shifting the circular mask to the right location and resolving the situations where it overhangs the margins
dimx = int(self.width)
dimy = int(self.height)
mask2 = np.full((dimx, dimy), False, dtype=bool)
maskp = self.mask[max(0, -x):min(self.maskdim, dimx-x), max(0, -y):min(self.maskdim,dimy-y)]
mask2[max(0, x):min(dimx, x+self.maskdim), max(0, y):min(dimy,y+self.maskdim)] = maskp
# and now we are assigning the new value for the part covered by the mask
value[mask2] = new_value
uncertainty[mask2] = 0.0
# logging.info("apply_value_mask done")
def im_score(im, env):
"""Scores the information model by finding the average absolute difference between the prediction of the information model and the real values in the environment."""
return -np.mean(np.abs(env.value - im.value))
def im_score_rmse_scikit(im, env):
"""Scores the information model by finding the RMSE between the information model values and the real values in the environment
FIXME: I think that this is not working very well for 2D areas. On the other hand, it has built-in weights
"""
return -mean_squared_error(env.value, im.value, squared=False)
def im_score_rmse(im, env):
"""Scores the information model by finding the RMSE between the information model values and the real values in the environment"""
se = (env.value - im.value) ** 2
return -np.sqrt(np.mean(se))
def im_score_rmse_weighted(im, env, weightmap):
se = (env.value - im.value) ** 2
weightedse = np.multiply(weightmap, se)
weightedval= np.sqrt(np.sum(weightedse) / np.sum(weightmap))
return -weightedval
def im_score_weighted(im, env, weightmap):
"""Scores the information model by finding the average absolute difference between the prediction of the information model and the real values in the environment.
The weightmap must be an array of the same size as the im value, and it must have its values between 0 (not interested) and 1 (interested)"""
wm = weightmap / np.mean(weightmap)
abserror = np.abs(env.value - im.value)
weightederror = np.multiply(wm, abserror)
return -np.mean(weightederror)
def im_score_weighted_asymmetric(im, env, weight_positive, weight_negative, weightmap):
"""Scores the information model by finding the average absolute difference between the prediction of the information model and the real values in the environment.
The weightmap must be an array of the same size as the im value, and it must have its values between 0 (not interested) and 1 (interested)
Weights differently positive errors (when the im is larger than env) and negative errors (when env is larger than im)"""
wm = weightmap / np.mean(weightmap)
error_positive = np.multiply(wm * weight_positive, np.max(im.value - env.value, 0))
error_negative = np.multiply(wm * weight_negative, np.max(env.value - im.value, 0))
return -np.mean(np.add(error_positive, error_negative))
## Functions used for the visualization
def visualizeEnvAndIM(ax_env, ax_im_value, ax_im_uncertainty, env, im):
score = im.score(env)
ax_env.set_title("Environment")
image_env = ax_env.imshow(env.value, vmin=0, vmax=5.0)
ax_im_value.set_title(f"IM.value sc={score:0.2}")
image_im_value = ax_im_value.imshow(im.value, vmin=0, vmax=5.0)
ax_im_uncertainty.set_title("IM - uncertainty")
image_im_uncertainty = ax_im_uncertainty.imshow(im.uncertainty, vmin=0)
# vmax=1.0
if __name__ == "__main__":
if True:
## Trying out the different types of scores, to see if they work correctly - this probably has to be made into a unit test
env = Environment("foo", 2, 2)
env.value = np.array([[2, 2], [1, 1]])
im = InformationModel("bar", 2, 2)
im.value = np.array([[1, 1], [2, 2]])
score = im_score(im, env)
print(score)
if False:
# create an environment to observe
env = DissipationModelEnvironment("water", 100, 100, seed=1)
env.evolve_speed = 1
env.p_pollution = 0.1
for t in range(120):
env.proceed()
#plt.imshow(env.value, vmin=0, vmax=1.0)
im = DiskEstimateScalarFieldIM("sample", env.width, env.height)
# im = GaussianProcessScalarFieldIM("sample", env.width, env.height)
# generate a series random observations
scores = []
times = []
for i in range(150):
x = random.randint(0, env.width-1)
y = random.randint(0, env.height-1)
value = env.value[x,y]
obs = {"x": x, "y": y, "value": value}
im.add_observation(obs)
time = timeit.timeit("im.proceed(1)", number=1, globals=globals())
times.append(time)
scores.append(im.score(env))
fig, ((ax_env, ax_im_value, ax_im_uncertainty, ax_score, ax_time)) = plt.subplots(1,5, figsize=(15,3))
visualizeEnvAndIM(ax_env, ax_im_value, ax_im_uncertainty, env, im)
ax_score.set_title("Score (avg)")
ax_score.plot(scores)
ax_time.set_title("Estimation time (sec)")
ax_time.plot(times)
plt.tight_layout()
plt.show()