diff --git a/dask_sql/physical/rel/custom/create_model.py b/dask_sql/physical/rel/custom/create_model.py index 2e6cdeb0a..d75c56ba9 100644 --- a/dask_sql/physical/rel/custom/create_model.py +++ b/dask_sql/physical/rel/custom/create_model.py @@ -1,6 +1,7 @@ import logging from typing import TYPE_CHECKING +import numpy as np from dask import delayed from dask_sql.datacontainer import DataContainer @@ -183,7 +184,13 @@ def convert(self, rel: "LogicalPlan", context: "dask_sql.Context") -> DataContai delayed_model = [delayed(model.fit)(x_p, y_p) for x_p, y_p in zip(X_d, y_d)] model = delayed_model[0].compute() - model = ParallelPostFit(estimator=model) + output_meta = np.array([]) + model = ParallelPostFit( + estimator=model, + predict_meta=output_meta, + predict_proba_meta=output_meta, + transform_meta=output_meta, + ) else: model.fit(X, y, **fit_kwargs) diff --git a/dask_sql/physical/rel/custom/predict.py b/dask_sql/physical/rel/custom/predict.py index eb5e4b69f..9e03902a8 100644 --- a/dask_sql/physical/rel/custom/predict.py +++ b/dask_sql/physical/rel/custom/predict.py @@ -2,6 +2,8 @@ import uuid from typing import TYPE_CHECKING +import numpy as np + from dask_sql.datacontainer import ColumnContainer, DataContainer from dask_sql.physical.rel.base import BaseRelPlugin @@ -59,7 +61,16 @@ def convert(self, rel: "LogicalPlan", context: "dask_sql.Context") -> DataContai model, training_columns = context.schema[schema_name].models[model_name] df = context.sql(sql_select) - prediction = model.predict(df[training_columns]) + part = df[training_columns] + try: + output_meta = model.predict_meta + except AttributeError: + output_meta = None + if part.shape[0].compute() == 0 and output_meta is not None: + empty_output = self.handle_empty_partitions(output_meta) + if empty_output is not None: + return empty_output + prediction = model.predict(part) predicted_df = df.assign(target=prediction) # Create a temporary context, which includes the @@ -79,3 +90,32 @@ def convert(self, rel: "LogicalPlan", context: "dask_sql.Context") -> DataContai dc = DataContainer(predicted_df, cc) return dc + + def handle_empty_partitions(self, output_meta): + if hasattr(output_meta, "__array_function__"): + if len(output_meta.shape) == 1: + shape = 0 + else: + shape = list(output_meta.shape) + shape[0] = 0 + ar = np.zeros( + shape=shape, + dtype=output_meta.dtype, + like=output_meta, + ) + return ar + elif "scipy.sparse" in type(output_meta).__module__: + # sparse matrices don't support + # `like` due to non implimented __array_function__ + # Refer https://github.com/scipy/scipy/issues/10362 + # Note below works for both cupy and scipy sparse matrices + if len(output_meta.shape) == 1: + shape = 0 + else: + shape = list(output_meta.shape) + shape[0] = 0 + + ar = type(output_meta)(shape, dtype=output_meta.dtype) + return ar + elif hasattr(output_meta, "iloc"): + return output_meta.iloc[:0, :]