This repository was archived by the owner on Mar 11, 2026. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 354
Expand file tree
/
Copy pathcombine.py
More file actions
342 lines (286 loc) · 13.5 KB
/
combine.py
File metadata and controls
342 lines (286 loc) · 13.5 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
#
# Copyright (c) 2023 salesforce.com, inc.
# All rights reserved.
# SPDX-License-Identifier: BSD-3-Clause
# For full license text, see the LICENSE file in the repo root or https://opensource.org/licenses/BSD-3-Clause
#
"""
Rules for combining the outputs of multiple time series models.
"""
from abc import abstractmethod
from collections import OrderedDict
import copy
import logging
from typing import List, Optional, Union
import numpy as np
from merlion.evaluate.anomaly import TSADMetric
from merlion.evaluate.forecast import ForecastMetric
from merlion.utils import UnivariateTimeSeries, TimeSeries
from merlion.utils.misc import AutodocABCMeta
logger = logging.getLogger(__name__)
def _align_outputs(all_model_outs: List[TimeSeries], target: TimeSeries) -> List[Optional[TimeSeries]]:
"""
Aligns the outputs of each model to the time series ``target``.
"""
if all(out is None for out in all_model_outs):
return [None for _ in all_model_outs]
if target is None:
time_stamps = np.unique(np.concatenate([out.to_pd().index for out in all_model_outs if out is not None]))
else:
t0 = min(min(v.index[0] for v in out.univariates) for out in all_model_outs if out is not None)
tf = max(max(v.index[-1] for v in out.univariates) for out in all_model_outs if out is not None)
time_stamps = target.to_pd()[t0:tf].index
return [None if out is None else out.align(reference=time_stamps) for out in all_model_outs]
class CombinerBase(metaclass=AutodocABCMeta):
"""
Abstract base class for combining the outputs of multiple models. Subclasses
should implement the abstract method ``_combine_univariates``. All combiners
are callable objects.
.. automethod:: __call__
"""
def __init__(self, abs_score=False):
"""
:param abs_score: whether to take the absolute value of the model
outputs. Useful for anomaly detection.
"""
self.abs_score = abs_score
self.n_models = None
self._override_models_used = {}
def reset(self):
self._override_models_used = {}
@property
def requires_training(self):
return False
def to_dict(self, _skipped_keys=None):
skipped_keys = set() if _skipped_keys is None else _skipped_keys
state = {k: copy.deepcopy(v) for k, v in self.__dict__.items() if k not in skipped_keys}
state["name"] = type(self).__name__
return state
@classmethod
def from_dict(cls, state):
state = copy.copy(state)
state.pop("name", None)
n_models = state.pop("n_models", None)
override_models_used = state.pop("_override_models_used", {})
ret = cls(**state)
ret.n_models = n_models
ret._override_models_used = {int(k): v for k, v in override_models_used.items()}
return ret
def __copy__(self):
return self.from_dict(self.to_dict())
def __deepcopy__(self, memodict=None):
if memodict is None:
memodict = {}
return self.__copy__()
@abstractmethod
def _combine_univariates(self, univariates: List[UnivariateTimeSeries]):
raise NotImplementedError
def set_model_used(self, i: int, used: bool):
self._override_models_used[i] = used
def get_model_used(self, i: int):
return self.models_used[i] if self.n_models is not None else self._override_models_used.get(i, True)
@property
def models_used(self) -> List[bool]:
"""
:return: which models are actually used to make predictions.
"""
assert self.n_models is not None, "Combiner must be trained to determine which models are used"
return [self._override_models_used.get(i, used) for i, used in enumerate(self._models_used)]
@property
def _models_used(self) -> List[bool]:
return [True] * self.n_models
def train(self, all_model_outs: List[TimeSeries], target: TimeSeries = None, **kwargs) -> TimeSeries:
"""
Trains the model combination rule.
:param all_model_outs: a list of time series, with each time series
representing the output of a single model.
:param target: a target time series (e.g. labels)
:return: a single time series of combined model outputs on this training data.
"""
self.n_models = len(all_model_outs)
return self(all_model_outs, target, _check_dim=False)
def __call__(self, all_model_outs: List[TimeSeries], target: TimeSeries, _check_dim=True) -> TimeSeries:
"""
Applies the model combination rule to combine multiple model outputs.
:param all_model_outs: a list of time series, with each time series
representing the output of a single model.
:param target: a target time series (e.g. labels)
:return: a single time series of combined model outputs on this training data.
"""
if isinstance(target, list):
new_all_model_outs = []
for i, out in enumerate(all_model_outs):
if out is None:
new_all_model_outs.append(out)
else:
assert isinstance(out, list) and len(out) == len(target), (
f"If target is a list of time series, each model output should be a "
f"list with the same length, but target has length {len(target)}, "
f"while model output {i} is a {type(out).__name__} of length {len(out)}"
)
new_all_model_outs.append(sum(out[1:], out[0]))
target = sum(target[1:], target[0])
all_model_outs = new_all_model_outs
js = [j for j, out in enumerate(all_model_outs) if out is not None]
assert len(js) > 0, "`all_model_outs` cannot all be `None`"
j = js[0]
assert all(out.dim == all_model_outs[j].dim for out in all_model_outs if out is not None)
if self.n_models is None:
self.n_models = len(all_model_outs)
models_used = self.models_used
if len(all_model_outs) == self.n_models:
j = 0
all_model_outs = [x for x, used in zip(all_model_outs, models_used) if used]
elif len(all_model_outs) != sum(models_used):
raise RuntimeError(
f"Expected either {self.n_models} or {sum(models_used)} "
f"model outputs, but got {len(all_model_outs)} model outputs "
f"instead."
)
all_model_outs = _align_outputs(all_model_outs, target)
if all(out is None for out in all_model_outs):
return None
combined = OrderedDict()
for i in range(all_model_outs[j].dim):
name = all_model_outs[j].names[i]
all_i = [None if ts is None else ts.univariates[ts.names[i]] for ts in all_model_outs]
combined[name] = self._combine_univariates(all_i)
return TimeSeries(combined)
class Mean(CombinerBase):
"""
Combines multiple models by taking their mean prediction.
"""
@property
def weights(self) -> np.ndarray:
n = sum(self.models_used)
return np.full(shape=n, fill_value=1 / n)
def _combine_univariates(self, univariates: List[UnivariateTimeSeries]) -> UnivariateTimeSeries:
non_none = [var for var in univariates if var is not None]
weights = np.asarray([w for w, var in zip(self.weights, univariates) if var is not None])
weights = weights / weights.sum()
v = non_none[0]
if self.abs_score and sum(self.models_used) > 1:
signs = np.median(np.sign([var.np_values for var in non_none]), axis=0)
signs[signs == 0] = -1
new_vals = signs * np.dot(weights, [np.abs(var.np_values) for var in non_none])
else:
new_vals = np.dot(weights, [var.np_values for var in non_none])
return UnivariateTimeSeries(v.time_stamps, new_vals, v.name)
class Median(CombinerBase):
"""
Combines multiple models by taking their median prediction.
"""
def _combine_univariates(self, univariates: List[UnivariateTimeSeries]) -> UnivariateTimeSeries:
non_none = [var for var in univariates if var is not None]
v = non_none[0]
if self.abs_score and sum(self.models_used) > 1:
signs = np.median(np.sign([var.np_values for var in non_none]), axis=0)
signs[signs == 0] = -1
new_vals = signs * np.median([np.abs(var.np_values) for var in non_none], axis=0)
else:
new_vals = np.median([var.np_values for var in non_none], axis=0)
return UnivariateTimeSeries(v.time_stamps, new_vals, v.name)
class Max(CombinerBase):
"""
Combines multiple models by taking their max prediction.
"""
def _combine_univariates(self, univariates: List[UnivariateTimeSeries]) -> UnivariateTimeSeries:
non_none = [var for var in univariates if var is not None]
v = non_none[0]
if self.abs_score and sum(self.models_used) > 1:
signs = np.median(np.sign([var.np_values for var in non_none]), axis=0)
signs[signs == 0] = -1
new_vals = signs * np.median([np.abs(var.np_values) for var in non_none], axis=0)
else:
new_vals = np.max([var.np_values for var in non_none], axis=0)
return UnivariateTimeSeries(v.time_stamps, new_vals, v.name)
class ModelSelector(Mean):
"""
Takes the mean of the best models, where the models are ranked according to
the value of an evaluation metric.
"""
def __init__(self, metric: Union[str, TSADMetric, ForecastMetric], abs_score=False):
"""
:param metric: the evaluation metric to use
:param abs_score: whether to take the absolute value of the model
outputs. Useful for anomaly detection.
"""
super().__init__(abs_score=abs_score)
if isinstance(metric, str):
metric_cls, name = metric.split(".", maxsplit=1)
metric_cls = {c.__name__: c for c in [ForecastMetric, TSADMetric]}[metric_cls]
metric = metric_cls[name]
self.metric = metric
self.metric_values = None
@property
def invert(self):
if isinstance(self.metric, ForecastMetric):
return True
if self.metric is TSADMetric.MeanTimeToDetect:
return True
return False
@property
def requires_training(self):
return True
def to_dict(self, _skipped_keys=None):
skipped_keys = set() if _skipped_keys is None else _skipped_keys
state = super().to_dict(skipped_keys.union({"metric"}))
state["metric"] = f"{type(self.metric).__name__}.{self.metric.name}"
return state
@classmethod
def from_dict(cls, state):
# Extract the metric values from the state (to set manually later)
metric_values = state.pop("metric_values", None)
ret = super().from_dict(state)
ret.metric_values = metric_values
return ret
@property
def _models_used(self) -> List[bool]:
assert self.n_models is not None, "Combiner must be trained to determine which models are used"
used_metric_values = [v for i, v in enumerate(self.metric_values) if self._override_models_used.get(i, True)]
val = np.min(used_metric_values) if self.invert else np.max(used_metric_values)
return (np.asarray(self.metric_values) == val).tolist()
def train(self, all_model_outs: List[TimeSeries], target: TimeSeries = None, **kwargs) -> TimeSeries:
metric_values = []
self.n_models = len(all_model_outs)
for i, model_out in enumerate(all_model_outs):
if not self._override_models_used.get(i, True):
metric_values.append(np.inf if self.invert else -np.inf) # worst-possible value
elif target is None and self.metric_values is None:
metric_values.append(1)
elif target is not None and not isinstance(target, list):
metric_values.append(self.metric.value(ground_truth=target, predict=model_out, **kwargs))
elif isinstance(target, list):
assert isinstance(model_out, list) and len(model_out) == len(target), (
f"If target is a list of time series, each model output should be a "
f"list with the same length, but target has length {len(target)}, "
f"while model output {i} is a {type(model_out).__name__} of length "
f"{len(model_out)}"
)
vals = [self.metric.value(ground_truth=y, predict=yhat, **kwargs) for y, yhat in zip(target, model_out)]
metric_values.append(np.mean(vals))
if len(metric_values) == len(all_model_outs):
self.metric_values = metric_values
return self(all_model_outs, target)
class MetricWeightedMean(ModelSelector):
"""
Computes a weighted average of model outputs with weights proportional to
the metric values (or their inverses).
"""
@property
def _models_used(self) -> List[bool]:
return CombinerBase._models_used.fget(self)
@property
def weights(self) -> np.ndarray:
w = np.asarray(self.metric_values)
w = 1 / w if self.invert else w
return w / w.sum()
class CombinerFactory(object):
"""
Factory object for creating combiner objects.
"""
@classmethod
def create(cls, name: str, **kwargs) -> CombinerBase:
alias = {cls.__name__: cls for cls in [Mean, Median, Max, ModelSelector, MetricWeightedMean]}
combiner_class = alias[name]
return combiner_class.from_dict(kwargs)