Conversation
Signed-off-by: Rene Jokiel <Rene.Jokiel@fau.de>
Signed-off-by: Christoph Huy <christoph.huy@campus.tu-berlin.de>
Signed-off-by: Christoph Huy <christoph.huy@campus.tu-berlin.de>
Signed-off-by: Mehdi-kbz <141425685+Mehdi-kbz@users.noreply.github.com>
Signed-off-by: Mehdi-kbz <141425685+Mehdi-kbz@users.noreply.github.com>
Signed-off-by: Christoph Huy <christoph.huy@campus.tu-berlin.de>
Added tests for IqrAnomalyDetection and IqrAnomalyDetectionRollingWindow using Pytest. Signed-off-by: Mehdi-kbz <141425685+Mehdi-kbz@users.noreply.github.com>
There was a problem hiding this comment.
Pull request overview
This PR introduces anomaly detection capabilities to the RTDIP SDK, implementing statistical methods for identifying outliers in time-series data using PySpark. The implementation includes both Median Absolute Deviation (MAD) and Interquartile Range (IQR) based detection methods.
- Adds MAD and IQR anomaly detection algorithms with rolling window variants
- Includes STL+MAD composite method for handling seasonal patterns
- Provides comprehensive test coverage with synthetic and real-world data scenarios
Reviewed changes
Copilot reviewed 11 out of 11 changed files in this pull request and generated 23 comments.
Show a summary per file
| File | Description |
|---|---|
src/sdk/python/rtdip_sdk/pipelines/anomaly_detection/interfaces.py |
Defines base interface for anomaly detection components |
src/sdk/python/rtdip_sdk/pipelines/anomaly_detection/spark/mad_anomaly_detection.py |
Implements MAD-based anomaly detection with rolling window and STL variants |
src/sdk/python/rtdip_sdk/pipelines/anomaly_detection/spark/iqr_anomaly_detection.py |
Implements IQR-based anomaly detection with rolling window support |
tests/sdk/python/rtdip_sdk/pipelines/anomaly_detection/spark/test_mad.py |
Test suite for MAD anomaly detection methods |
tests/sdk/python/rtdip_sdk/pipelines/anomaly_detection/spark/test_iqr_anomaly_detection.py |
Test suite for IQR anomaly detection methods |
amos_team_resources/anomaly_detection/mad/visualize_test.py |
Visualization script for MAD detection results |
amos_team_resources/anomaly_detection/iqr/visualizeIQR.py |
Visualization script for IQR detection results |
src/sdk/python/rtdip_sdk/pipelines/anomaly_detection/__init__.py |
Package initialization with copyright header |
src/sdk/python/rtdip_sdk/pipelines/anomaly_detection/spark/__init__.py |
Spark submodule initialization |
tests/sdk/python/rtdip_sdk/pipelines/anomaly_detection/__init__.py |
Test package initialization |
tests/sdk/python/rtdip_sdk/pipelines/anomaly_detection/spark/__init__.py |
Test spark submodule initialization |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| pdf["is_anomaly"] = pdf["rolling_mad_z"].abs() > self.threshold | ||
|
|
||
| # keep only anomalies | ||
| anomalies_pdf = pdf[pdf["is_anomaly"] == True].copy() |
There was a problem hiding this comment.
Avoid explicit comparison with True. Use pdf["is_anomaly"] instead of pdf["is_anomaly"] == True for more Pythonic code.
| anomalies_pdf = pdf[pdf["is_anomaly"] == True].copy() | |
| anomalies_pdf = pdf[pdf["is_anomaly"]].copy() |
| pdf["is_anomaly"] = pdf["mad_zscore"].abs() > self.threshold | ||
|
|
||
| # keep ONLY anomalies | ||
| anomalies_pdf = pdf[pdf["is_anomaly"] == True].copy() |
There was a problem hiding this comment.
Avoid explicit comparison with True. Use pdf["is_anomaly"] instead of pdf["is_anomaly"] == True for more Pythonic code.
| anomalies_pdf = pdf[pdf["is_anomaly"] == True].copy() | |
| anomalies_pdf = pdf[pdf["is_anomaly"]].copy() |
| pdf["is_anomaly"] = (pdf["value"] < lower_bound) | (pdf["value"] > upper_bound) | ||
|
|
||
| # Keep only anomalies | ||
| anomalies_pdf = pdf[pdf["is_anomaly"] == True].copy() |
There was a problem hiding this comment.
Avoid explicit comparison with True. Use pdf["is_anomaly"] instead of pdf["is_anomaly"] == True for more Pythonic code.
| anomalies_pdf = pdf[pdf["is_anomaly"] == True].copy() | |
| anomalies_pdf = pdf[pdf["is_anomaly"]].copy() |
| if mad == 0: | ||
| pdf["mad_zscore"] = 0 | ||
| else: | ||
| pdf["mad_zscore"] = 0.6745 * (pdf["value"] - median) / mad |
There was a problem hiding this comment.
The check if mad == 0 on line 88 is unreachable because line 86 sets mad = float(max(mad, 1.0)), ensuring mad >= 1.0. Consider removing the dead code or adjusting the clamping logic if zero MAD needs to be handled differently.
| if mad == 0: | |
| pdf["mad_zscore"] = 0 | |
| else: | |
| pdf["mad_zscore"] = 0.6745 * (pdf["value"] - median) / mad | |
| pdf["mad_zscore"] = 0.6745 * (pdf["value"] - median) / mad |
|
|
||
| Returns only the detected anomalies. | ||
|
|
||
| :param df: Spark DataFrame containing a numeric "value" column. |
There was a problem hiding this comment.
The docstring states that the DataFrame should contain a column named "value", but doesn't document that a "timestamp" column is required for the rolling window variant (as seen in line 151 where it's sorted). Consider adding this requirement to the documentation or making it an explicit parameter.
| :param df: Spark DataFrame containing a numeric "value" column. | |
| :param df: Spark DataFrame containing a numeric "value" column and a "timestamp" column. | |
| The "timestamp" column is required for sorting and rolling window calculations. |
| pdf["is_anomaly"] = (pdf["value"] < lower_bound) | (pdf["value"] > upper_bound) | ||
|
|
||
| # Keep only anomalies | ||
| anomalies_pdf = pdf[pdf["is_anomaly"] == True].copy() |
There was a problem hiding this comment.
Avoid explicit comparison with True. Use pdf["is_anomaly"] instead of pdf["is_anomaly"] == True for more Pythonic code.
| anomalies_pdf = pdf[pdf["is_anomaly"] == True].copy() | |
| anomalies_pdf = pdf[pdf["is_anomaly"]].copy() |
| """ | ||
|
|
||
| # Spark → Pandas | ||
| pdf = df.toPandas() |
There was a problem hiding this comment.
Converting the entire Spark DataFrame to Pandas with toPandas() defeats the purpose of using Spark and can cause memory issues with large datasets. Consider implementing the IQR calculation using native Spark operations (e.g., using approxQuantile for percentiles) to maintain scalability.
| rolling_mad = rolling_mad.apply(lambda x: max(x, 1.0)) | ||
|
|
||
| # Robust rolling z-score | ||
| pdf["rolling_mad_z"] = 0.6745 * (pdf["value"] - rolling_median) / rolling_mad |
There was a problem hiding this comment.
Magic number 0.6745 should be defined as a named constant with a comment explaining its significance. This constant is the scaling factor to convert MAD to an equivalent standard deviation (specifically, MAD/0.6745 ≈ σ for normally distributed data).
| iqr = q3 - q1 | ||
|
|
||
| # Clamp IQR to prevent over-sensitive detection when data has no spread | ||
| iqr = max(iqr, 1.0) |
There was a problem hiding this comment.
Magic number 1.0 used as a minimum threshold for IQR clamping. Consider defining this as a named constant (e.g., MIN_IQR_THRESHOLD) with documentation explaining why this minimum is necessary to prevent over-sensitive anomaly detection when data has no spread.
| rolling_q1 = pdf["value"].rolling(self.window_size).quantile(0.25) | ||
| rolling_q3 = pdf["value"].rolling(self.window_size).quantile(0.75) | ||
| rolling_iqr = rolling_q3 - rolling_q1 | ||
|
|
||
| # Clamp IQR to prevent over-sensitivity | ||
| rolling_iqr = rolling_iqr.apply(lambda x: max(x, 1.0)) | ||
|
|
||
| # Compute rolling bounds | ||
| lower_bound = rolling_q1 - self.threshold * rolling_iqr | ||
| upper_bound = rolling_q3 + self.threshold * rolling_iqr | ||
|
|
||
| # Flag anomalies outside the rolling bounds | ||
| pdf["is_anomaly"] = (pdf["value"] < lower_bound) | (pdf["value"] > upper_bound) |
There was a problem hiding this comment.
Rolling window operations on lines 153-155 will produce NaN values for the first window_size-1 rows. These NaN values will propagate to the anomaly detection logic, potentially causing unexpected behavior. Consider handling NaN values explicitly (e.g., by using .dropna() or setting a minimum number of periods with min_periods parameter).
No description provided.