Skip to content

Handle nullable types and empty partitions before Dask-ML predict #799

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 9 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 11 additions & 1 deletion dask_sql/physical/rel/custom/create_model.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import logging
from typing import TYPE_CHECKING

import numpy as np
from dask import delayed

from dask_sql.datacontainer import DataContainer
Expand Down Expand Up @@ -183,7 +184,16 @@ def convert(self, rel: "LogicalPlan", context: "dask_sql.Context") -> DataContai

delayed_model = [delayed(model.fit)(x_p, y_p) for x_p, y_p in zip(X_d, y_d)]
model = delayed_model[0].compute()
model = ParallelPostFit(estimator=model)
if "sklearn" in model_class:
output_meta = np.array([])
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@VibhuJawa I wanted to ask your opinion on this check?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should handle this in https://github.com/dask-contrib/dask-sql/pull/832/files PR and follow dask/dask-ml#912 for a clean fix. I dont think we should fix it here.

model = ParallelPostFit(
estimator=model,
predict_meta=output_meta,
predict_proba_meta=output_meta,
transform_meta=output_meta,
)
else:
model = ParallelPostFit(estimator=model)

else:
model.fit(X, y, **fit_kwargs)
Expand Down
67 changes: 66 additions & 1 deletion dask_sql/physical/rel/custom/predict.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,15 @@
import uuid
from typing import TYPE_CHECKING

import numpy as np

try:
from dask_ml.wrappers import ParallelPostFit

dask_ml_flag = True
except ImportError: # pragma: no cover
dask_ml_flag = False

from dask_sql.datacontainer import ColumnContainer, DataContainer
from dask_sql.physical.rel.base import BaseRelPlugin

Expand Down Expand Up @@ -59,7 +68,27 @@ def convert(self, rel: "LogicalPlan", context: "dask_sql.Context") -> DataContai

model, training_columns = context.schema[schema_name].models[model_name]
df = context.sql(sql_select)
prediction = model.predict(df[training_columns])
part = df[training_columns]

if dask_ml_flag:
if isinstance(model, ParallelPostFit):
output_meta = model.predict_meta
if output_meta is None:
output_meta = model.estimator.predict(part._meta_nonempty)
try:
prediction = part.map_partitions(
self._predict,
output_meta,
model.estimator,
meta=output_meta,
)
except ValueError:
prediction = model.predict(part)
else:
prediction = model.predict(part)
else:
prediction = model.predict(part)

predicted_df = df.assign(target=prediction)

# Create a temporary context, which includes the
Expand All @@ -79,3 +108,39 @@ def convert(self, rel: "LogicalPlan", context: "dask_sql.Context") -> DataContai
dc = DataContainer(predicted_df, cc)

return dc

def _predict(self, part, predict_meta, estimator):
if part.shape[0] == 0 and predict_meta is not None:
empty_output = self.handle_empty_partitions(predict_meta)
if empty_output is not None:
return empty_output
return estimator.predict(part)

def handle_empty_partitions(self, output_meta):
if hasattr(output_meta, "__array_function__"):
if len(output_meta.shape) == 1:
shape = 0
else:
shape = list(output_meta.shape)
shape[0] = 0
ar = np.zeros(
shape=shape,
dtype=output_meta.dtype,
like=output_meta,
)
return ar
elif "scipy.sparse" in type(output_meta).__module__:
# sparse matrices don't support
# `like` due to non implimented __array_function__
# Refer https://github.com/scipy/scipy/issues/10362
# Note below works for both cupy and scipy sparse matrices
if len(output_meta.shape) == 1:
shape = 0
else:
shape = list(output_meta.shape)
shape[0] = 0

ar = type(output_meta)(shape, dtype=output_meta.dtype)
return ar
elif hasattr(output_meta, "iloc"):
return output_meta.iloc[:0, :]
76 changes: 76 additions & 0 deletions tests/integration/test_model.py
Original file line number Diff line number Diff line change
Expand Up @@ -924,3 +924,79 @@ def test_experiment_automl_regressor(c, client, training_df):
), "Best model was not registered"

check_trained_model(c, "my_automl_exp2")


# TODO - many ML tests fail on clusters without sklearn - can we avoid this?
@skip_if_external_scheduler
def test_predict_with_nullable_types(c):
df = pd.DataFrame(
{
"rough_day_of_year": [0, 1, 2, 3],
"prev_day_inches_rained": [0.0, 1.0, 2.0, 3.0],
"rained": [False, False, False, True],
}
)
c.create_table("train_set", df)

model_class = "'sklearn.linear_model.LogisticRegression'"

c.sql(
f"""
CREATE OR REPLACE MODEL model WITH (
model_class = {model_class},
wrap_predict = True,
wrap_fit = False,
target_column = 'rained'
) AS (
SELECT *
FROM train_set
)
"""
)

expected = c.sql(
"""
SELECT * FROM PREDICT(
MODEL model,
SELECT * FROM train_set
)
"""
)

df = pd.DataFrame(
{
"rough_day_of_year": pd.Series([0, 1, 2, 3], dtype="Int32"),
"prev_day_inches_rained": pd.Series([0.0, 1.0, 2.0, 3.0], dtype="Float32"),
"rained": pd.Series([False, False, False, True]),
}
)
c.create_table("train_set", df)

c.sql(
f"""
CREATE OR REPLACE MODEL model WITH (
model_class = {model_class},
wrap_predict = True,
wrap_fit = False,
target_column = 'rained'
) AS (
SELECT *
FROM train_set
)
"""
)

result = c.sql(
"""
SELECT * FROM PREDICT(
MODEL model,
SELECT * FROM train_set
)
"""
)

assert_eq(
expected,
result,
check_dtype=False,
)