Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add sklearnex.BasicStatistics API for CSR inputs on GPU and a test for it #2253

Merged
merged 18 commits into from
Jan 27, 2025
Merged
Show file tree
Hide file tree
Changes from 13 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 7 additions & 1 deletion onedal/basic_statistics/basic_statistics.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,13 @@ def fit(self, data, sample_weight=None, queue=None):
sample_weight = _check_array(sample_weight, ensure_2d=False)

is_single_dim = data.ndim == 1
data_table, weights_table = to_table(data, sample_weight, queue=queue)

data_table = to_table(data, queue=queue)
weights_table = (
to_table(sample_weight, queue=queue)
if sample_weight is not None
else to_table(None)
)

dtype = data_table.dtype
raw_result = self._compute_raw(data_table, weights_table, policy, dtype, is_csr)
Expand Down
16 changes: 14 additions & 2 deletions onedal/basic_statistics/tests/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,23 @@
"max": (lambda X: np.max(X, axis=0), (1e-7, 1e-7)),
"mean": (lambda X: np.mean(X, axis=0), (5e-7, 1e-7)),
"variance": (lambda X: np.var(X, axis=0), (2e-3, 2e-3)),
"variation": (lambda X: np.std(X, axis=0) / np.mean(X, axis=0), (5e-2, 5e-2)),
"variation": (
lambda X: (
np.std(X, axis=0) / np.mean(X, axis=0)
if np.all(np.mean(X, axis=0))
else np.array(
[
x / y if y != 0 else np.nan
for x, y in zip(np.std(X, axis=0), np.mean(X, axis=0))
]
)
),
(0.1, 0.1),
),
"sum_squares": (lambda X: np.sum(np.square(X), axis=0), (2e-4, 1e-7)),
"sum_squares_centered": (
lambda X: np.sum(np.square(X - np.mean(X, axis=0)), axis=0),
(2e-4, 1e-7),
(1e-2, 1e-7),
),
"standard_deviation": (lambda X: np.std(X, axis=0), (2e-3, 2e-3)),
"second_order_raw_moment": (lambda X: np.mean(np.square(X), axis=0), (1e-6, 1e-7)),
Expand Down
41 changes: 36 additions & 5 deletions sklearnex/basic_statistics/basic_statistics.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,15 @@
import warnings

import numpy as np
from scipy.sparse import issparse
from sklearn.base import BaseEstimator
from sklearn.utils import check_array
from sklearn.utils.validation import _check_sample_weight

from daal4py.sklearn._n_jobs_support import control_n_jobs
from daal4py.sklearn._utils import sklearn_check_version
from daal4py.sklearn._utils import daal_check_version, sklearn_check_version
from onedal.basic_statistics import BasicStatistics as onedal_BasicStatistics
from onedal.utils import _is_csr

from .._device_offload import dispatch
from .._utils import IntelEstimator, PatchingConditionsChain
Expand Down Expand Up @@ -166,21 +168,50 @@
f"'{self.__class__.__name__}' object has no attribute '{attr}'"
)

def _onedal_supported(self, method_name, *data):
def _onedal_cpu_supported(self, method_name, *data):
patching_status = PatchingConditionsChain(
f"sklearnex.basic_statistics.{self.__class__.__name__}.{method_name}"
)
return patching_status

_onedal_cpu_supported = _onedal_supported
_onedal_gpu_supported = _onedal_supported
def _onedal_gpu_supported(self, method_name, *data):
patching_status = PatchingConditionsChain(

Check warning on line 178 in sklearnex/basic_statistics/basic_statistics.py

View check run for this annotation

Codecov / codecov/patch

sklearnex/basic_statistics/basic_statistics.py#L178

Added line #L178 was not covered by tests
f"sklearnex.basic_statistics.{self.__class__.__name__}.{method_name}"
)
X, sample_weight = data

Check warning on line 181 in sklearnex/basic_statistics/basic_statistics.py

View check run for this annotation

Codecov / codecov/patch

sklearnex/basic_statistics/basic_statistics.py#L181

Added line #L181 was not covered by tests

is_data_supported = (

Check warning on line 183 in sklearnex/basic_statistics/basic_statistics.py

View check run for this annotation

Codecov / codecov/patch

sklearnex/basic_statistics/basic_statistics.py#L183

Added line #L183 was not covered by tests
_is_csr(X) and daal_check_version((2025, "P", 200))
) or not issparse(X)

is_sample_weight_supported = sample_weight is None or not issparse(X)

Check warning on line 187 in sklearnex/basic_statistics/basic_statistics.py

View check run for this annotation

Codecov / codecov/patch

sklearnex/basic_statistics/basic_statistics.py#L187

Added line #L187 was not covered by tests

patching_status.and_conditions(

Check warning on line 189 in sklearnex/basic_statistics/basic_statistics.py

View check run for this annotation

Codecov / codecov/patch

sklearnex/basic_statistics/basic_statistics.py#L189

Added line #L189 was not covered by tests
[
(
is_sample_weight_supported,
"Sample weights are not supported for CSR data format",
),
(
is_data_supported,
"Supported data formats: Dense, CSR (oneDAL version >= 2025.2.0).",
),
]
)
return patching_status

Check warning on line 201 in sklearnex/basic_statistics/basic_statistics.py

View check run for this annotation

Codecov / codecov/patch

sklearnex/basic_statistics/basic_statistics.py#L201

Added line #L201 was not covered by tests

def _onedal_fit(self, X, sample_weight=None, queue=None):
if sklearn_check_version("1.2"):
self._validate_params()

if sklearn_check_version("1.0"):
X = validate_data(self, X, dtype=[np.float64, np.float32], ensure_2d=False)
X = validate_data(
self,
X,
dtype=[np.float64, np.float32],
ensure_2d=False,
accept_sparse="csr",
)
else:
X = check_array(X, dtype=[np.float64, np.float32])

Expand Down
166 changes: 150 additions & 16 deletions sklearnex/basic_statistics/tests/test_basic_statistics.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,30 @@
import numpy as np
import pytest
from numpy.testing import assert_allclose
from scipy import sparse as sp

from daal4py.sklearn._utils import daal_check_version
from onedal.basic_statistics.tests.utils import options_and_tests
from onedal.tests.utils._dataframes_support import (
_convert_to_dataframe,
get_dataframes_and_queues,
get_queues,
)
from sklearnex import config_context
from sklearnex.basic_statistics import BasicStatistics
from sklearnex.tests.utils import gen_sparse_dataset


# Compute the basic statistics on sparse data on CPU or GPU depending on the queue
def compute_sparse_result(X_sparse, options, queue):
if queue is not None and queue.sycl_device.is_gpu:
with config_context(target_offload="gpu"):
basicstat = BasicStatistics(result_options=options)
result = basicstat.fit(X_sparse)
else:
basicstat = BasicStatistics(result_options=options)
result = basicstat.fit(X_sparse)
return result


@pytest.mark.parametrize("dataframe,queue", get_dataframes_and_queues())
Expand All @@ -41,19 +57,19 @@
expected_min = np.array([0, 0])
expected_max = np.array([1, 1])

assert_allclose(expected_mean, result.mean)
assert_allclose(expected_max, result.max)
assert_allclose(expected_min, result.min)
assert_allclose(expected_mean, result.mean_)
assert_allclose(expected_max, result.max_)
assert_allclose(expected_min, result.min_)

result = BasicStatistics().fit(X_df, sample_weight=weights_df)

expected_weighted_mean = np.array([0.25, 0.25])
expected_weighted_min = np.array([0, 0])
expected_weighted_max = np.array([0.5, 0.5])

assert_allclose(expected_weighted_mean, result.mean)
assert_allclose(expected_weighted_min, result.min)
assert_allclose(expected_weighted_max, result.max)
assert_allclose(expected_weighted_mean, result.mean_)
assert_allclose(expected_weighted_min, result.min_)
assert_allclose(expected_weighted_max, result.max_)


@pytest.mark.parametrize("dataframe,queue", get_dataframes_and_queues())
Expand All @@ -78,16 +94,16 @@
expected_weighted_mean = np.array([0.25, 0.25])
expected_weighted_min = np.array([0, 0])
expected_weighted_max = np.array([0.5, 0.5])
assert_allclose(expected_weighted_mean, result.mean)
assert_allclose(expected_weighted_max, result.max)
assert_allclose(expected_weighted_min, result.min)
assert_allclose(expected_weighted_mean, result.mean_)
assert_allclose(expected_weighted_max, result.max_)
assert_allclose(expected_weighted_min, result.min_)
else:
expected_mean = np.array([0.5, 0.5])
expected_min = np.array([0, 0])
expected_max = np.array([1, 1])
assert_allclose(expected_mean, result.mean)
assert_allclose(expected_max, result.max)
assert_allclose(expected_min, result.min)
assert_allclose(expected_mean, result.mean_)
assert_allclose(expected_max, result.max_)
assert_allclose(expected_min, result.min_)


@pytest.mark.parametrize("dataframe,queue", get_dataframes_and_queues())
Expand Down Expand Up @@ -117,7 +133,7 @@
else:
result = basicstat.fit(X_df)

res = getattr(result, result_option)
res = getattr(result, result_option + "_")
if weighted:
weighted_data = np.diag(weights) @ X
gtr = function(weighted_data)
Expand All @@ -128,6 +144,45 @@
assert_allclose(gtr, res, atol=tol)


@pytest.mark.parametrize("queue", get_queues())
@pytest.mark.parametrize("result_option", options_and_tests.keys())
@pytest.mark.parametrize("row_count", [1000, 10000])
@pytest.mark.parametrize("column_count", [10, 100])
@pytest.mark.parametrize("dtype", [np.float32, np.float64])
def test_single_option_on_random_sparse_data(
queue, result_option, row_count, column_count, dtype
):
if result_option in ["max", "sum_squares"] and (
queue is None or queue.sycl_device.is_cpu
):
pytest.skip("max and sum_squares computations are buggy on CPU")
function, tols = options_and_tests[result_option]
fp32tol, fp64tol = tols
seed = 77

gen = np.random.default_rng(seed)

X_sparse = gen_sparse_dataset(
row_count,
column_count,
density=0.01,
format="csr",
dtype=dtype,
random_state=gen,
)

X_dense = X_sparse.toarray()

result = compute_sparse_result(X_sparse, result_option, queue)

res = getattr(result, result_option + "_")

gtr = function(X_dense)

tol = fp32tol if res.dtype == np.float32 else fp64tol
assert_allclose(gtr, res, atol=tol)


@pytest.mark.parametrize("dataframe,queue", get_dataframes_and_queues())
@pytest.mark.parametrize("row_count", [100, 1000])
@pytest.mark.parametrize("column_count", [10, 100])
Expand All @@ -152,7 +207,7 @@
else:
result = basicstat.fit(X_df)

res_mean, res_max, res_sum = result.mean, result.max, result.sum
res_mean, res_max, res_sum = result.mean_, result.max_, result.sum_
if weighted:
weighted_data = np.diag(weights) @ X
gtr_mean, gtr_max, gtr_sum = (
Expand All @@ -173,6 +228,48 @@
assert_allclose(gtr_sum, res_sum, atol=tol)


@pytest.mark.parametrize("queue", get_queues())
@pytest.mark.parametrize("row_count", [100, 1000])
@pytest.mark.parametrize("column_count", [10, 100])
@pytest.mark.parametrize("dtype", [np.float32, np.float64])
def test_multiple_options_on_random_sparse_data(queue, row_count, column_count, dtype):
seed = 77

gen = np.random.default_rng(seed)

X_sparse = gen_sparse_dataset(
row_count,
column_count,
density=0.05,
format="csr",
dtype=dtype,
random_state=gen,
)

X_dense = X_sparse.toarray()

options = [
"sum",
"min",
"mean",
"standard_deviation",
"variance",
"second_order_raw_moment",
]

result = compute_sparse_result(X_sparse, options, queue)

for result_option in options_and_tests:
function, tols = options_and_tests[result_option]
if not result_option in options:
continue
fp32tol, fp64tol = tols
res = getattr(result, result_option + "_")
gtr = function(X_dense)
tol = fp32tol if res.dtype == np.float32 else fp64tol
assert_allclose(gtr, res, atol=tol)


@pytest.mark.parametrize("dataframe,queue", get_dataframes_and_queues())
@pytest.mark.parametrize("row_count", [100, 1000])
@pytest.mark.parametrize("column_count", [10, 100])
Expand Down Expand Up @@ -203,7 +300,7 @@
for result_option in options_and_tests:
function, tols = options_and_tests[result_option]
fp32tol, fp64tol = tols
res = getattr(result, result_option)
res = getattr(result, result_option + "_")
if weighted:
gtr = function(weighted_data)
else:
Expand All @@ -212,6 +309,43 @@
assert_allclose(gtr, res, atol=tol)


@pytest.mark.parametrize("queue", get_queues())
@pytest.mark.parametrize("row_count", [100, 1000])
@pytest.mark.parametrize("column_count", [10, 100])
@pytest.mark.parametrize("dtype", [np.float32, np.float64])
def test_all_option_on_random_sparse_data(queue, row_count, column_count, dtype):
seed = 77

gen = np.random.default_rng(seed)

X_sparse = gen_sparse_dataset(
row_count,
column_count,
density=0.05,
format="csr",
dtype=dtype,
random_state=gen,
)
X_dense = X_sparse.toarray()

result = compute_sparse_result(X_sparse, "all", queue)

for result_option in options_and_tests:
if result_option in ["max", "sum_squares"] and (
queue is None or queue.sycl_device.is_cpu
):
# TODO: here is a bug in oneDAL's max and sum_squares computations on CPU
continue
function, tols = options_and_tests[result_option]
fp32tol, fp64tol = tols
res = getattr(result, result_option + "_")

gtr = function(X_dense)

Check notice on line 343 in sklearnex/basic_statistics/tests/test_basic_statistics.py

View check run for this annotation

codefactor.io / CodeFactor

sklearnex/basic_statistics/tests/test_basic_statistics.py#L343

Unresolved comment '# TODO: here is a bug in oneDAL's max and sum_squares computations on CPU'. (C100)

tol = fp32tol if res.dtype == np.float32 else fp64tol
assert_allclose(gtr, res, atol=tol)


@pytest.mark.parametrize("dataframe,queue", get_dataframes_and_queues())
@pytest.mark.parametrize("result_option", options_and_tests.keys())
@pytest.mark.parametrize("data_size", [100, 1000])
Expand All @@ -238,7 +372,7 @@
else:
result = basicstat.fit(X_df)

res = getattr(result, result_option)
res = getattr(result, result_option + "_")
if weighted:
weighted_data = weights * X
gtr = function(weighted_data)
Expand Down
7 changes: 7 additions & 0 deletions sklearnex/tests/test_run_to_run_stability.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import daal4py as d4p
from daal4py.sklearn._utils import daal_check_version
from onedal.tests.utils._dataframes_support import _as_numpy, get_dataframes_and_queues
from sklearnex.basic_statistics import BasicStatistics
from sklearnex.cluster import DBSCAN, KMeans
from sklearnex.decomposition import PCA
from sklearnex.metrics import pairwise_distances, roc_auc_score
Expand Down Expand Up @@ -117,6 +118,12 @@ def _run_test(estimator, method, datasets):


_sparse_instances = [SVC()]
if daal_check_version((2025, "P", 200)): # Test for >= 2025.2.0
_sparse_instances.extend(
[
BasicStatistics(result_options=["sum", "min"]),
]
)
if daal_check_version((2024, "P", 700)): # Test for > 2024.7.0
_sparse_instances.extend(
[
Expand Down
Loading
Loading