1515import pandas as pd
1616
1717from pyspark .sql import DataFrame
18+ from pyspark .sql .types import StructField , StructType , DoubleType , BooleanType
1819from typing import Optional , List , Union
1920
2021from ...._pipeline_utils .models import (
@@ -210,6 +211,22 @@ def libraries() -> Libraries:
210211 def settings () -> dict :
211212 return {}
212213
214+ @staticmethod
215+ def _build_result_schema (df : DataFrame ) -> StructType :
216+ return StructType (
217+ list (df .schema .fields )
218+ + [
219+ StructField ("mad_zscore" , DoubleType (), True ),
220+ StructField ("is_anomaly" , BooleanType (), True ),
221+ ]
222+ )
223+
224+ @staticmethod
225+ def _empty_result_df (df : DataFrame , schema : StructType ) -> DataFrame :
226+ """Create an empty DataFrame with the correct schema using pandas."""
227+ empty_pdf = pd .DataFrame (columns = schema .fieldNames ())
228+ return df .sparkSession .createDataFrame (empty_pdf , schema = schema )
229+
213230 def detect (self , df : DataFrame ) -> DataFrame :
214231 """
215232 Detects anomalies in the input DataFrame using the configured MAD scorer.
@@ -228,13 +245,25 @@ def detect(self, df: DataFrame) -> DataFrame:
228245 - `is_anomaly`: Boolean anomaly flag.
229246 """
230247
248+ result_schema = self ._build_result_schema (df )
249+
231250 pdf = df .toPandas ()
251+ if pdf .empty :
252+ return self ._empty_result_df (df , result_schema )
232253
233254 scores = self .scorer .score (pdf ["value" ])
234255 pdf ["mad_zscore" ] = scores
235256 pdf ["is_anomaly" ] = self .scorer .is_anomaly (scores )
236257
237- return df .sparkSession .createDataFrame (pdf [pdf ["is_anomaly" ]].copy ())
258+ anomalies_pdf = pdf [pdf ["is_anomaly" ]].copy ()
259+ anomalies_pdf = anomalies_pdf [result_schema .fieldNames ()]
260+
261+ if anomalies_pdf .empty :
262+ return self ._empty_result_df (df , result_schema )
263+
264+ # Ensure correct column order matches schema
265+ anomalies_pdf = anomalies_pdf [result_schema .fieldNames ()]
266+ return df .sparkSession .createDataFrame (anomalies_pdf , schema = result_schema )
238267
239268
240269class DecompositionMadAnomalyDetection (AnomalyDetectionInterface ):
@@ -368,6 +397,16 @@ def _decompose(self, df: DataFrame) -> DataFrame:
368397 else :
369398 raise ValueError (f"Unsupported decomposition method: { self .decomposition } " )
370399
400+ @staticmethod
401+ def _build_result_schema (df : DataFrame ) -> StructType :
402+ return StructType (
403+ list (df .schema .fields )
404+ + [
405+ StructField ("mad_zscore" , DoubleType (), True ),
406+ StructField ("is_anomaly" , BooleanType (), True ),
407+ ]
408+ )
409+
371410 def detect (self , df : DataFrame ) -> DataFrame :
372411 """
373412 Detects anomalies by scoring the decomposition residuals using the configured MAD scorer.
@@ -385,12 +424,25 @@ def detect(self, df: DataFrame) -> DataFrame:
385424 - `mad_zscore`: MAD-based anomaly score computed on `residual`.
386425 - `is_anomaly`: Boolean anomaly flag.
387426 """
388-
427+
389428 decomposed_df = self ._decompose (df )
429+ result_schema = self ._build_result_schema (decomposed_df )
430+
390431 pdf = decomposed_df .toPandas ().sort_values (self .timestamp_column )
391432
433+ if pdf .empty :
434+ return MadAnomalyDetection ._empty_result_df (decomposed_df , result_schema )
435+
392436 scores = self .scorer .score (pdf ["residual" ])
393437 pdf ["mad_zscore" ] = scores
394438 pdf ["is_anomaly" ] = self .scorer .is_anomaly (scores )
395439
396- return df .sparkSession .createDataFrame (pdf [pdf ["is_anomaly" ]].copy ())
440+ anomalies_pdf = pdf [pdf ["is_anomaly" ]].copy ()
441+ anomalies_pdf = anomalies_pdf [result_schema .fieldNames ()]
442+
443+ if anomalies_pdf .empty :
444+ return MadAnomalyDetection ._empty_result_df (decomposed_df , result_schema )
445+
446+ # Ensure correct column order matches schema
447+ anomalies_pdf = anomalies_pdf [result_schema .fieldNames ()]
448+ return df .sparkSession .createDataFrame (anomalies_pdf , schema = result_schema )
0 commit comments