-
Notifications
You must be signed in to change notification settings - Fork 1
Expand file tree
/
Copy pathpnn_base.py
More file actions
288 lines (222 loc) · 10.8 KB
/
pnn_base.py
File metadata and controls
288 lines (222 loc) · 10.8 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
"""
Base class for PNNs, to be imported elsewhere.
"""
from pathlib import Path
from shutil import rmtree
from time import time
from typing import Any, Iterable, Optional, Self
from zipfile import ZipFile, ZIP_DEFLATED
import dill as pickle
import numpy as np
import tensorflow as tf
from tensorflow.keras.callbacks import EarlyStopping
from tensorflow.keras.models import Model, load_model
from sklearn.preprocessing import MinMaxScaler, RobustScaler
from .. import constants as c, data as d
XSCALER_FILENAME = "X.scaler"
YSCALER_FILENAME = "y.scaler"
### SAVING/LOADING
def timestamp() -> str:
"""
Create a timestamp based on time().
"""
timestamp = str(time())
timestamp = timestamp.replace(".", "")
return timestamp
def dump_into_zipfile(zipfile: ZipFile, filename: str, data, **kwargs) -> None:
"""
Pickle data (any object) and write them into an open zipfile.
"""
zipfile.writestr(filename, pickle.dumps(data), compress_type=ZIP_DEFLATED, **kwargs)
def load_dump(filename: Path | str) -> Any:
"""
Load pickled data from a file.
"""
with open(filename, mode="rb") as file:
data = pickle.load(file)
return data
### LOSS FUNCTIONS
@tf.keras.utils.register_keras_serializable() # Enables saving/loading models with this custom loss function
def nll_loss(y_true: np.ndarray, y_pred: np.ndarray) -> np.ndarray:
"""
Negative Log Likelihood (NLL) loss function.
`y_true` contains N reference values per row.
`y_pred` contains N predicted mean values, followed by N predicted variances, per row:
[mean1, mean2, ..., meanN, var1, var2, ..., varN]
"""
N = y_true.shape[1]
mean = y_pred[:, :N]
var = tf.nn.softplus(y_pred[:, N:])
return tf.reduce_mean(0.5 * (tf.math.log(var) + (tf.square(y_true - mean) / var) + tf.math.log(2 * np.pi)))
### MAIN PNN CLASS
class BasePNN:
### CONFIGURATION
name = "BasePNN"
extension = ".keras" # Default extension for model itself
def __init__(self, model: Model | Iterable[Model], *,
scaler_X: Optional[RobustScaler]=None, scaler_y: Optional[MinMaxScaler]=None) -> None:
"""
Initialisation with just a Model so it can be used for training new models or loading from file.
Scalers for X and y are optional.
"""
self.model = model
self.scaler_X = scaler_X
self.scaler_y = scaler_y
def __repr__(self) -> str:
return f"{self.name}: {self.model}"
### DATA RESCALING
def scale_X(self, X: np.ndarray) -> np.ndarray:
"""
Rescale X using the included scaler.
"""
assert self.scaler_X is not None, f"Model `{self}` does not have a rescaler for X."
assert (self_shape := self.scaler_X.center_.shape[0]) == (data_shape := X.shape[-1]), f"Data ({data_shape}) and scaler ({self_shape}) have incompatible shapes."
return d.scale_X(self.scaler_X, X)
def scale_y(self, y: np.ndarray) -> np.ndarray:
"""
Rescale log(y) using the included scaler.
"""
assert self.scaler_y is not None, f"Model `{self}` does not have a rescaler for y."
assert (self_shape := self.scaler_y.data_range_.shape[0]) == (data_shape := y.shape[-1]), f"Data ({data_shape}) and scaler ({self_shape}) have incompatible shapes."
return d.scale_y(self.scaler_y, y)
def inverse_scale_y(self, mean: np.ndarray, variance: np.ndarray) -> tuple[np.ndarray, np.ndarray]:
"""
Rescale network outputs using the included scaler.
"""
assert (self_shape := self.scaler_y.data_range_.shape[0]) == (mean_shape := mean.shape[-1]) == (var_shape := variance.shape[-1]), f"Mean ({mean_shape}), variance ({var_shape}), and scaler ({self_shape}) have incompatible shapes."
return d.inverse_scale_y(self.scaler_y, mean, variance)
### CREATION
@classmethod
def build(cls, input_shape: tuple, output_size: int, *args, **kwargs) -> Self:
"""
Build the underlying model.
To be overridden by subclasses.
"""
return NotImplemented
def train(self, X_train: np.ndarray, y_train: np.ndarray, *,
epochs: int=1000, batch_size: int=32, learning_rate: float=0.001, validation_split: float=0.1, **kwargs) -> None:
"""
Train on the provided X and y data, with early stopping.
**kwargs are passed to self.model.fit.
"""
# Data scaling
X_train = self.scale_X(X_train) if self.scaler_X is not None else X_train
y_train = self.scale_y(y_train) if self.scaler_y is not None else y_train
# Setup
optimizer = tf.keras.optimizers.Adam(learning_rate=learning_rate)
early_stopping = EarlyStopping(monitor="val_loss", patience=80, verbose=1, mode="min", restore_best_weights=True)
# Training
self.model.compile(optimizer=optimizer, loss=nll_loss)
history = self.model.fit(X_train, y_train, epochs=epochs, batch_size=batch_size, validation_split=validation_split, callbacks=[early_stopping], **kwargs)
@classmethod
def build_and_train(cls, X_train: np.ndarray, y_train: np.ndarray, *,
scaler_X: Optional[RobustScaler]=None, scaler_y: Optional[MinMaxScaler]=None,
build_kwargs: Optional[dict]={}, **train_kwargs) -> Self:
"""
Build and train a model on the provided X and y data, with early stopping.
Convenience function combining the build and train functions.
"""
# Setup
input_shape = X_train.shape[1:]
output_size = y_train.shape[-1]
newpnn = cls.build(input_shape, output_size, scaler_X=scaler_X, scaler_y=scaler_y, **build_kwargs)
newpnn.train(X_train, y_train, **train_kwargs)
return newpnn
### SAVING / LOADING
def _save_model(self, filename: Path | str, *args, **kwargs) -> None:
"""
Save the underlying model; can be overridden while maintaining general save function.
Returns its Path so that any modifications are passed on.
"""
filename = Path(filename)
self.model.save(filename, *args, **kwargs)
def save(self, filename: Path | str, **kwargs) -> None:
"""
Save the full model into a ZIP file, including scalers (can be None).
"""
filename = Path(filename)
# Save underlying
model_filename = filename.parent/(f"temp_{timestamp()}" + self.extension) # Temporary filename
self._save_model(model_filename) # Save to temporary location
# Save components into ZIP file
with ZipFile(filename, mode="w") as zipfile:
# Move underlying model file into ZIP folder
model_filename_zip = "model" + self.extension # Maintain extension, drop parents
zipfile.write(model_filename, model_filename_zip, compress_type=ZIP_DEFLATED)
# Save scalers
dump_into_zipfile(zipfile, XSCALER_FILENAME, self.scaler_X)
dump_into_zipfile(zipfile, YSCALER_FILENAME, self.scaler_y)
# Delete temporary files
model_filename.unlink()
@classmethod
def _load_model(cls, filename: Path | str, *args, **kwargs) -> Model | Iterable[Model]:
"""
Load the underlying model; can be overridden while maintaining general load function.
Assumes the model has already been unzipped (but can itself be another zip file).
"""
return load_model(filename, *args, **kwargs)
@classmethod
def load(cls, filename: Path | str, *args, **kwargs) -> Self:
"""
Load a model from a ZIP file, including scalers (can be None).
"""
# Generate temporary filename
filename = Path(filename)
temp_folder = Path(f"temp_{timestamp()}")
# Open ZIP file
model_filename = "model" + cls.extension
with ZipFile(filename, mode="r") as zipfile:
# Extract model into temporary filename
zipfile.extract(model_filename, temp_folder)
# Load pickled files directly from ZIP
with zipfile.open(XSCALER_FILENAME) as file:
scaler_X = pickle.load(file)
with zipfile.open(YSCALER_FILENAME) as file:
scaler_y = pickle.load(file)
# Load unpacked files and delete temporary folder
model = cls._load_model(temp_folder/model_filename)
rmtree(temp_folder)
return cls(model, scaler_X=scaler_X, scaler_y=scaler_y)
### APPLICATION
def _predict_samples(self, X: np.ndarray, **kwargs) -> np.ndarray:
"""
Use the model to predict y values for X.
"""
return self.model.predict(X, **kwargs)
def predict_with_uncertainty(self, X: np.ndarray, **predict_kwargs) -> tuple[np.ndarray, np.ndarray, np.ndarray, np.ndarray]:
"""
Use the given model to predict y values for given X values, including the rescaling back to regular units.
"""
# Data scaling
X = self.scale_X(X) if self.scaler_X is not None else X
# Generate predictions in scaled space
pred_samples = self._predict_samples(X, **predict_kwargs)
# Separate predicted means, predicted variances
N = pred_samples.shape[-1] // 2 # Number of predicted mean parameters
mean_predictions_scaled = pred_samples[..., :N]
raw_variances_scaled = pred_samples[..., N:]
variance_predictions_scaled = tf.nn.softplus(raw_variances_scaled)
# Convert from scaled space to real units
if self.scaler_y is not None:
mean_predictions, variance_predictions = self.inverse_scale_y(mean_predictions_scaled, variance_predictions_scaled)
else:
mean_predictions, variance_predictions = mean_predictions_scaled, variance_predictions_scaled
# Calculate aleatoric and epistemic variance in the original space
aleatoric_variance = np.mean(variance_predictions, axis=0)
epistemic_variance = np.var(mean_predictions, axis=0)
total_variance = aleatoric_variance + epistemic_variance
mean_predictions = np.mean(mean_predictions, axis=0) # Average over n_samples
return mean_predictions, total_variance, aleatoric_variance, epistemic_variance
### DROPOUT/DROPCONNECT VERSION, FOR CONVENIENCE
class DropoutPNN(BasePNN):
### APPLICATION
@tf.function # 4x Speed-up
def _predict_with_dropout(self, X: np.ndarray):
return self.model(X, training=True) # `training=True` just turns the dropout on, it does not affect the model parameters
def _predict_samples(self, X: np.ndarray, *, n_samples: int=100) -> np.ndarray:
"""
Predict y values for given X values using dropout/dropconnect.
"""
pred_samples = [self._predict_with_dropout(X).numpy() for _ in range(n_samples)]
pred_samples = np.array(pred_samples)
return pred_samples