-
Notifications
You must be signed in to change notification settings - Fork 23
Expand file tree
/
Copy pathmodels.py
More file actions
382 lines (334 loc) · 13.4 KB
/
models.py
File metadata and controls
382 lines (334 loc) · 13.4 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
#!/usr/bin/env python3
# Copyright (c) Meta Platforms, Inc. and affiliates.
#
# This source code is licensed under the MIT license found in the
# LICENSE file in the root directory of this source tree.
"""
Defines concrete strength and global warming potential (GWP) models.
"""
from __future__ import annotations
from typing import List, Optional, Union
import torch
from botorch import fit_gpytorch_mll
from botorch.models import ModelList, ModelListGP, SingleTaskGP
from botorch.models.model import Model
from botorch.models.transforms.input import (
AffineInputTransform,
ChainedInputTransform,
Log10,
Normalize,
)
from botorch.models.transforms.outcome import Standardize
from botorch.posteriors import Posterior
from botorch.utils.constraints import LogTransformedInterval
from gpytorch import ExactMarginalLogLikelihood
from gpytorch.kernels import LinearKernel, MaternKernel, RBFKernel, ScaleKernel
from gpytorch.likelihoods import GaussianLikelihood
from gpytorch.means import ZeroMean
from gpytorch.models import ExactGP
from torch import Tensor
from utils import get_day_zero_data, SustainableConcreteDataset
class SustainableConcreteModel(object):
def __init__(
self,
strength_days: List[int],
strength_model: Model | None = None,
gwp_model: Model | None = None,
d: int | None = None,
):
"""A multi-output model that jointly predicts GWP and compressive strength at
pre-defined days `strength_days`.
Args:
strength_days (List[int]): A list days to predict stength for.
strength_model (Optional[Model], optional): The strength model. Defaults to None.
gwp_model (Optional[Model], optional): The GWP model. Defaults to None.
d (Optional[int], optional): The dimensionality of the input to the strength model.
Is inferred automatically if the fit functions are called. NOTE: The model
assumes that the last element of the input corresponds to the time dimension.
"""
self.strength_days = strength_days
self.strength_model = None
self.gwp_model = None
self.d = d
def fit_strength_model(
self, data: SustainableConcreteDataset, use_fixed_noise: bool = False
) -> SingleTaskGP:
"""Fits the strength model to the given `data`. Upon completion, the model
can be accessed with the `strength_model` attribute.
Args:
data: A SustainableConcreteDataset containing the strength data.
use_fixed_noise: Toggles the use of known observation variances.
Returns:
The fitted strength model.
"""
X, Y, Yvar, X_bounds = data.strength_data
self._set_d(X.shape[-1])
self.strength_model = fit_strength_gp(
X=X, Y=Y, Yvar=Yvar, X_bounds=X_bounds, use_fixed_noise=use_fixed_noise
)
return self.strength_model
def fit_gwp_model(
self, data: SustainableConcreteDataset, use_fixed_noise: bool = False
) -> SingleTaskGP:
"""Fits the global warming potential (GWP) model to the given `data`.
Upon completion, the model can be accessed with the `gwp_model` attribute.
Args:
data: A SustainableConcreteDataset containing the GWP data.
use_fixed_noise: Toggles the use of known observation variances.
Returns:
The fitted GWP model.
"""
X, Y, Yvar, X_bounds = data.gwp_data
self._set_d(X.shape[-1] + 1)
self.gwp_model = fit_gwp_gp(
X=X, Y=Y, Yvar=Yvar, X_bounds=X_bounds, use_fixed_noise=use_fixed_noise
)
return self.gwp_model
def _set_d(self, d: int) -> None:
if self.d is None:
self.d = d
def get_model_list(self) -> ModelListGP:
"""Returns a ModelListGP modeling the GWP and compressive strength objectives as a function
of composition only.
Converts the strength and gwp models into a model list of independent models for gwp,
and x-day strengths, by fixing the time input of the strength model at 1 and 28 days.
"""
if self.d is None:
raise ValueError("Model not fit yet.")
models = [
self.gwp_model,
*(
FixedFeatureModel(
base_model=self.strength_model,
dim=self.d,
indices=[self.d - 1],
values=[day],
)
for day in self.strength_days
),
]
model = ModelList(*models)
return model # for use with multi-objective optimization
def plot_strength_curve(self, composition: Tensor, max_day: int = 28) -> None:
time = torch.arange(max_day + 1)
composition = composition.unsqueeze(0).expand(len(time))
# IDEA: use FixedFeatureModel?
# BatchedMultiOutputGPyTorchModel, ExactGP
class FixedFeatureModel(Model):
# advantage: only need to implement posterior for it to work with qNEHI
# disadvantage: makes the strength outputs independent (IDEA: could add joint model)
# TODO: check that these are appended before the InputTransforms are applied, not after.
def __init__(
self,
base_model: Model,
dim: int,
indices: Union[List[int], Tensor],
values: Union[List[float], Tensor],
):
"""A wrapper around a GP model that fixes some inputs to specific values.
Args:
base_model: The base model to wrap.
dim: The input dimensionality of the FixedFeatureModel. This is usually the
input dimensionality of base_model minus the number of fixed features.
indices: The indices of the inputs to fix.
values: The values to fix the inputs to.
Raises:
ValueError: If indices and values do not have the same length.
"""
super().__init__()
self.base_model = base_model
if len(indices) != len(values):
raise ValueError("indices and values do not have the same length.")
values = torch.as_tensor(values)
self._dim = dim
self._indices: Tensor = torch.as_tensor(indices)
self._fixed = torch.tensor(
[i in indices for i in torch.arange(dim, dtype=self._indices.dtype)]
)
self._values = values
def _add_fixed_features(self, X: Tensor) -> Tensor:
"""
Args:
X: A `n x d`-dim Tensor.
Returns:
A `n x (d + len(self._indices))`-dim Tensor.
"""
tkwargs = {"dtype": X.dtype, "device": X.device}
Z = torch.zeros(*X.shape[:-1], X.shape[-1] + len(self._indices), **tkwargs)
Z[..., self._fixed] = self._values
Z[..., ~self._fixed] = X
return Z
def forward(self, X: Tensor, *args, **kwargs) -> Tensor:
"""The forward method of the FixedFeatureModel, based on the forward method of
the base model with the fixed features added.
Args:
X: The `batch_shape x d`-dim input Tensor.
Returns:
The `batch_shape x m`-dim output Tensor.
"""
return self.base_model.forward(self._add_fixed_features(X), *args, **kwargs)
def posterior(self, X: Tensor, *args, **kwargs) -> Posterior:
"""Computes the posterior of the FixedFeatureModel, based on the posterior of
the base model with the fixed features added.
Args:
X: The `batch_shape x d`-dim input Tensor.
Returns:
The posterior of the FixedFeatureModel evaluated at `X`.
"""
return self.base_model.posterior(self._add_fixed_features(X), *args, **kwargs)
@property
def num_outputs(self) -> int:
return self.base_model.num_outputs # need to adjust if we batch fixed features
def subset_output(self, idcs: List[int]) -> FixedFeatureModel:
raise FixedFeatureModel(
base_model=self.base_model.subset_output(idcs),
dim=self._dim,
indices=self._indices,
values=self._value,
)
def fit_gwp_gp(
X: Tensor,
Y: Tensor,
Yvar: Tensor,
X_bounds: Tensor | None = None,
use_fixed_noise: bool = False,
) -> SingleTaskGP:
"""Fits a Gaussian process model to the given global warming potential (GWP) data.
Args:
X: `n x d`-dim Tensor of composition inputs without time.
Y: `n x 1`-dim Tensor of GWP values.
Yvar: `n x 1`-dim Tensor of GWP variances.
Returns:
A SingleTaskGP model fit to the data.
"""
d_out = Y.shape[-1]
if d_out != 1:
raise ValueError("Output dimensions is not one in gwp fitting.")
# GWP is a linear function of the inputs
covar_module = LinearKernel()
# removing any input and outcome transforms, as well as the prior mean from
# the model to force it to be homogenuous, i.e. it has no offset.
model_kwargs = {
"train_X": X,
"train_Y": Y,
"mean_module": ZeroMean(),
"covar_module": covar_module,
"input_transform": None,
"outcome_transform": None,
}
if use_fixed_noise:
model_kwargs["train_Yvar"] = Yvar
else:
model_kwargs["likelihood"] = GaussianLikelihood( # pyre-ignore
noise_constraint=LogTransformedInterval(1e-4, 1.0, initial_value=1e-2)
)
model = SingleTaskGP(**model_kwargs) # pyre-ignore
mll = ExactMarginalLogLikelihood(model.likelihood, model)
fit_gpytorch_mll(mll)
return model
def fit_strength_gp(
X: Tensor,
Y: Tensor,
Yvar: Tensor,
X_bounds: Tensor | None = None,
use_fixed_noise: bool = False,
) -> ExactGP:
"""Fits a Gaussian process model to the given strength data.
IDEAS:
- Features:
- w / b ratio
- maturity i.e. sum_i(max(0, temperature_i) * delta_time_i)
- Kernels:
- Try orthogonal additive kernel again
- temperature modeling via additive kernel?
Args:
X: Tensor of composition inputs including time (n x d).
Y: Tensor of strength values (n x 1).
Yvar: Tensor of strength variances (n x 1).
Returns:
A SingleTaskGP model fit to the strength data.
"""
d_in = X.shape[-1]
d_out = Y.shape[-1]
if d_out != 1:
raise ValueError("Output dimensions is not one in strength curve fitting.")
# add data to condition GP to be zero at day zero
X_0, Y_0, Yvar_0 = get_day_zero_data(X=X, bounds=X_bounds, n=128)
X = torch.cat((X, X_0), dim=0)
Y = torch.cat((Y, Y_0), dim=0)
Yvar = torch.cat((Yvar, Yvar_0), dim=0)
# joint kernel to model all interactions
base_kernel = MaternKernel(
nu=2.5,
ard_num_dims=d_in,
lengthscale_constraint=LogTransformedInterval(1e-2, 1e3, initial_value=1.0),
lengthscale_prior=None,
)
scaled_base_kernel = ScaleKernel(
base_kernel=base_kernel,
outputscale_constraint=LogTransformedInterval(1e-2, 1e2, initial_value=1.0),
outputscale_prior=None,
)
# additive kernel to model behavior w.r.t. time
# try matern?
time_kernel = RBFKernel( # MaternKernel # smoother RBF seems to work better for additive components
active_dims=torch.tensor([d_in - 1]), # last dimension is time
ard_num_dims=1,
lengthscale_constraint=LogTransformedInterval(1e-2, 1e3, initial_value=1.0),
lengthscale_prior=None,
)
scaled_time_kernel = ScaleKernel(
base_kernel=time_kernel,
outputscale_constraint=LogTransformedInterval(1e-2, 1e2, initial_value=1.0),
outputscale_prior=None,
# batch_shape=batch_shape,
)
# IDEA: + scaled_water_kernel and other additive components
kernel = scaled_base_kernel + scaled_time_kernel
model_kwargs = {
"train_X": X,
"train_Y": Y,
"covar_module": kernel,
"input_transform": get_strength_gp_input_transform(d=d_in, bounds=X_bounds),
"outcome_transform": Standardize(d_out),
}
if use_fixed_noise:
model_kwargs["train_Yvar"] = Yvar
else:
model_kwargs["likelihood"] = GaussianLikelihood(
noise_constraint=LogTransformedInterval(1e-6, 1.0, initial_value=1e-1)
)
model = SingleTaskGP(**model_kwargs)
mll = ExactMarginalLogLikelihood(model.likelihood, model)
fit_gpytorch_mll(mll)
return model
def get_strength_gp_input_transform(
d: int, bounds: Optional[Tensor]
) -> ChainedInputTransform:
"""Chains a log(time + 1) and Normalize transform on d dimensional input data,
with the provided bounds.
Args:
bounds: `2 x d` tensor of lower and upper bounds for each dimension.
Returns:
A ChainedInputTransform that log-transforms the time dimension and subsequently
normalizes all dimensions to the unit hyper-cube.
"""
time_index = [d - 1]
tf1 = AffineInputTransform( # adds one to time dimension before taking log
d,
coefficient=torch.ones(1),
offset=torch.ones(1),
indices=time_index,
reverse=True,
)
tf2 = Log10(
indices=time_index
) # taking log of time dimension for better extrapolation
if bounds is not None:
transformed_bounds = tf2(tf1(bounds))
tf3 = Normalize(
d, bounds=transformed_bounds
) # normalizing after log(t + 1) transform
else:
tf3 = Normalize(d) # normalizing after log(t + 1) transform
return ChainedInputTransform(tf1=tf1, tf2=tf2, tf3=tf3)