Skip to content

Commit fac60c1

Browse files
committed
chore(anomaly detection): store datapoint even if we do not have enough history for AD
1 parent ecca1fd commit fac60c1

File tree

2 files changed

+261
-328
lines changed

2 files changed

+261
-328
lines changed

src/seer/anomaly_detection/anomaly_detection.py

+83-33
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
import datetime
22
import logging
33
import random
4-
from typing import List, Tuple
4+
from typing import List, Optional, Tuple
55

66
import numpy as np
77
import pandas as pd
@@ -15,6 +15,7 @@
1515
from seer.anomaly_detection.detectors.prophet_anomaly_detector import ProphetAnomalyDetector
1616
from seer.anomaly_detection.models import (
1717
AlgoConfig,
18+
DynamicAlert,
1819
MPTimeSeriesAnomalies,
1920
MPTimeSeriesAnomaliesSingleWindow,
2021
TimeSeries,
@@ -141,6 +142,57 @@ def _batch_detect(
141142

142143
return timeseries, batch_anomalies, prophet_df
143144

145+
@sentry_sdk.trace
146+
def _save_and_fire_pruning_task(
147+
self,
148+
timepoint: TimeSeriesPoint,
149+
streamed_anomalies: MPTimeSeriesAnomaliesSingleWindow,
150+
algo_data: Optional[dict],
151+
historic: DynamicAlert,
152+
config: AnomalyDetectionConfig,
153+
alert_data_accessor: AlertDataAccessor,
154+
queue_cleanup_predict_task: bool,
155+
force_cleanup: bool,
156+
) -> None:
157+
# Save new data point
158+
alert_data_accessor.save_timepoint(
159+
external_alert_id=historic.external_alert_id,
160+
timepoint=timepoint,
161+
anomaly=streamed_anomalies,
162+
anomaly_algo_data=algo_data,
163+
)
164+
165+
if queue_cleanup_predict_task:
166+
# Delayed import due to circular imports
167+
from seer.anomaly_detection.tasks import cleanup_timeseries_and_predict
168+
169+
try:
170+
# Set flag and create new task for cleanup if we have enough history and too many old points or not enough predictions remaining
171+
cleanup_predict_config = historic.cleanup_predict_config
172+
if alert_data_accessor.can_queue_cleanup_predict_task(
173+
historic.external_alert_id
174+
) and (
175+
force_cleanup
176+
or (
177+
cleanup_predict_config.num_old_points
178+
>= cleanup_predict_config.num_acceptable_points
179+
or cleanup_predict_config.num_predictions_remaining
180+
<= cleanup_predict_config.num_acceptable_predictions
181+
)
182+
):
183+
alert_data_accessor.queue_data_purge_flag(historic.external_alert_id)
184+
cleanup_timeseries_and_predict.apply_async(
185+
(historic.external_alert_id, cleanup_predict_config.timestamp_threshold),
186+
countdown=random.randint(
187+
0, config.time_period * 45
188+
), # Wait between 0 - time_period * 45 seconds before queuing so the tasks are not all queued at the same time and stll have a chance to run before nex detection call
189+
)
190+
except Exception as e:
191+
# Reset task and capture exception
192+
alert_data_accessor.reset_cleanup_predict_task(historic.external_alert_id)
193+
sentry_sdk.capture_exception(e)
194+
logger.exception(e)
195+
144196
@inject
145197
@sentry_sdk.trace
146198
def _online_detect(
@@ -205,6 +257,28 @@ def _online_detect(
205257
# Confirm that there is enough data (after purge)
206258
min_data = self._min_required_timesteps(historic.config.time_period)
207259
if len(historic.timeseries.timestamps) < min_data:
260+
# If there is not enough data, we will not detect any anomalies. We still should save the timepoint
261+
# so that we can detect anomalies in the future.
262+
self._save_and_fire_pruning_task(
263+
timepoint=ts_external[0],
264+
streamed_anomalies=MPTimeSeriesAnomaliesSingleWindow(
265+
flags=["none"] * len(ts_external),
266+
scores=[0.0] * len(ts_external),
267+
thresholds=[],
268+
matrix_profile=np.array([]),
269+
window_size=3,
270+
original_flags=[],
271+
confidence_levels=[],
272+
),
273+
algo_data=None,
274+
historic=historic,
275+
config=config,
276+
alert_data_accessor=alert_data_accessor,
277+
queue_cleanup_predict_task=(
278+
len(historic.timeseries.timestamps) == min_data - 1
279+
), # we queue cleanup task if we have just enough data to detect anomalies in the next call
280+
force_cleanup=True,
281+
)
208282
logger.error(f"Not enough timeseries data. At least {min_data} data points required")
209283
raise ClientError(
210284
f"Not enough timeseries data. At least {min_data} data points required"
@@ -223,7 +297,6 @@ def _online_detect(
223297
raise ServerError("Invalid state")
224298

225299
# Run stream detection
226-
227300
# SuSS Window
228301
stream_detector = MPStreamAnomalyDetector(
229302
history_timestamps=historic.timeseries.timestamps,
@@ -276,40 +349,17 @@ def _online_detect(
276349
streamed_anomalies_online = alert_data_accessor.combine_anomalies(
277350
streamed_anomalies, None, anomalies.use_suss[-num_anomlies:]
278351
)
279-
280-
# Save new data point
281-
alert_data_accessor.save_timepoint(
282-
external_alert_id=alert.id,
352+
self._save_and_fire_pruning_task(
283353
timepoint=ts_external[0],
284-
anomaly=streamed_anomalies,
285-
anomaly_algo_data=streamed_anomalies_online.get_anomaly_algo_data(len(ts_external))[0],
354+
streamed_anomalies=streamed_anomalies,
355+
algo_data=streamed_anomalies_online.get_anomaly_algo_data(len(ts_external))[0],
356+
historic=historic,
357+
config=config,
358+
alert_data_accessor=alert_data_accessor,
359+
queue_cleanup_predict_task=True,
360+
force_cleanup=False,
286361
)
287362

288-
# Delayed import due to circular imports
289-
from seer.anomaly_detection.tasks import cleanup_timeseries_and_predict
290-
291-
try:
292-
# Set flag and create new task for cleanup if too many old points or not enough predictions remaining
293-
cleanup_predict_config = historic.cleanup_predict_config
294-
if alert_data_accessor.can_queue_cleanup_predict_task(historic.external_alert_id) and (
295-
cleanup_predict_config.num_old_points
296-
>= cleanup_predict_config.num_acceptable_points
297-
or cleanup_predict_config.num_predictions_remaining
298-
<= cleanup_predict_config.num_acceptable_predictions
299-
):
300-
alert_data_accessor.queue_data_purge_flag(historic.external_alert_id)
301-
cleanup_timeseries_and_predict.apply_async(
302-
(historic.external_alert_id, cleanup_predict_config.timestamp_threshold),
303-
countdown=random.randint(
304-
0, config.time_period * 60
305-
), # Wait between 0 - time_period * 60 seconds before queuing so the tasks are not all queued at the same time
306-
)
307-
except Exception as e:
308-
# Reset task and capture exception
309-
alert_data_accessor.reset_cleanup_predict_task(historic.external_alert_id)
310-
sentry_sdk.capture_exception(e)
311-
logger.exception(e)
312-
313363
return ts_external, streamed_anomalies_online
314364

315365
def _min_required_timesteps(self, time_period, min_num_days=7):

0 commit comments

Comments
 (0)