Skip to content

Commit 2159d03

Browse files
ethanglasericfaust
andauthored
chore: initial refactoring of incremental spmd algos (#2248)
* Refactor incremental spmd algos * Clear spmd impls, specify non-spmd get_policy in base cls * black * minor bs fix * apply changes to PCA predict and add transform * add comments * tuple indices safeguarding * incremental bs fit fixes * restore previous 2, added to raw inputs instead * Update onedal/decomposition/pca.py Co-authored-by: Ian Faust <[email protected]> * blacked --------- Co-authored-by: Ian Faust <[email protected]>
1 parent 9b69074 commit 2159d03

File tree

10 files changed

+65
-308
lines changed

10 files changed

+65
-308
lines changed

onedal/basic_statistics/incremental_basic_statistics.py

+10-4
Original file line numberDiff line numberDiff line change
@@ -71,8 +71,9 @@ def __init__(self, result_options="all"):
7171

7272
def _reset(self):
7373
self._need_to_finalize = False
74-
self._partial_result = self._get_backend(
75-
"basic_statistics", None, "partial_compute_result"
74+
# Not supported with spmd policy so IncrementalBasicStatistics must be specified
75+
self._partial_result = IncrementalBasicStatistics._get_backend(
76+
IncrementalBasicStatistics, "basic_statistics", None, "partial_compute_result"
7677
)
7778

7879
def __getstate__(self):
@@ -105,7 +106,10 @@ def partial_fit(self, X, weights=None, queue=None):
105106
Returns the instance itself.
106107
"""
107108
self._queue = queue
108-
policy = self._get_policy(queue, X)
109+
# Not supported with spmd policy so IncrementalBasicStatistics must be specified
110+
policy = IncrementalBasicStatistics._get_policy(
111+
IncrementalBasicStatistics, queue, X
112+
)
109113

110114
X = _check_array(
111115
X, dtype=[np.float64, np.float32], ensure_2d=False, force_all_finite=False
@@ -123,7 +127,9 @@ def partial_fit(self, X, weights=None, queue=None):
123127
self._onedal_params = self._get_onedal_params(False, dtype=dtype)
124128

125129
X_table, weights_table = to_table(X, weights, queue=queue)
126-
self._partial_result = self._get_backend(
130+
# Not supported with spmd policy so IncrementalBasicStatistics must be specified
131+
self._partial_result = IncrementalBasicStatistics._get_backend(
132+
IncrementalBasicStatistics,
127133
"basic_statistics",
128134
None,
129135
"partial_compute",

onedal/covariance/incremental_covariance.py

+10-4
Original file line numberDiff line numberDiff line change
@@ -58,8 +58,9 @@ def __init__(self, method="dense", bias=False, assume_centered=False):
5858

5959
def _reset(self):
6060
self._need_to_finalize = False
61-
self._partial_result = self._get_backend(
62-
"covariance", None, "partial_compute_result"
61+
# Not supported with spmd policy so IncrementalEmpiricalCovariance must be specified
62+
self._partial_result = IncrementalEmpiricalCovariance._get_backend(
63+
IncrementalEmpiricalCovariance, "covariance", None, "partial_compute_result"
6364
)
6465

6566
def __getstate__(self):
@@ -99,15 +100,20 @@ def partial_fit(self, X, y=None, queue=None):
99100

100101
self._queue = queue
101102

102-
policy = self._get_policy(queue, X)
103+
# Not supported with spmd policy so IncrementalEmpiricalCovariance must be specified
104+
policy = IncrementalEmpiricalCovariance._get_policy(
105+
IncrementalEmpiricalCovariance, queue, X
106+
)
103107

104108
X_table = to_table(X, queue=queue)
105109

106110
if not hasattr(self, "_dtype"):
107111
self._dtype = X_table.dtype
108112

109113
params = self._get_onedal_params(self._dtype)
110-
self._partial_result = self._get_backend(
114+
# Not supported with spmd policy so IncrementalEmpiricalCovariance must be specified
115+
self._partial_result = IncrementalEmpiricalCovariance._get_backend(
116+
IncrementalEmpiricalCovariance,
111117
"covariance",
112118
None,
113119
"partial_compute",

onedal/decomposition/incremental_pca.py

+9-3
Original file line numberDiff line numberDiff line change
@@ -100,7 +100,10 @@ def __init__(
100100

101101
def _reset(self):
102102
self._need_to_finalize = False
103-
module = self._get_backend("decomposition", "dim_reduction")
103+
# Not supported with spmd policy so IncrementalPCA must be specified
104+
module = IncrementalPCA._get_backend(
105+
IncrementalPCA, "decomposition", "dim_reduction"
106+
)
104107
if hasattr(self, "components_"):
105108
del self.components_
106109
self._partial_result = module.partial_train_result()
@@ -154,14 +157,17 @@ def partial_fit(self, X, queue):
154157

155158
self._queue = queue
156159

157-
policy = self._get_policy(queue, X)
160+
# Not supported with spmd policy so IncrementalPCA must be specified
161+
policy = IncrementalPCA._get_policy(IncrementalPCA, queue, X)
158162
X_table = to_table(X, queue=queue)
159163

160164
if not hasattr(self, "_dtype"):
161165
self._dtype = X_table.dtype
162166
self._params = self._get_onedal_params(X_table)
163167

164-
self._partial_result = self._get_backend(
168+
# Not supported with spmd policy so IncrementalPCA must be specified
169+
self._partial_result = IncrementalPCA._get_backend(
170+
IncrementalPCA,
165171
"decomposition",
166172
"dim_reduction",
167173
"partial_train",

onedal/decomposition/pca.py

+16-4
Original file line numberDiff line numberDiff line change
@@ -119,7 +119,8 @@ def _compute_noise_variance(self, n_components, n_sf_min):
119119
return 0.0
120120

121121
def _create_model(self):
122-
m = self._get_backend("decomposition", "dim_reduction", "model")
122+
# Not supported with spmd policy so BasePCA must be specified
123+
m = BasePCA._get_backend(BasePCA, "decomposition", "dim_reduction", "model")
123124
m.eigenvectors = to_table(self.components_)
124125
m.means = to_table(self.mean_)
125126
if self.whiten:
@@ -128,16 +129,27 @@ def _create_model(self):
128129
return m
129130

130131
def predict(self, X, queue=None):
131-
policy = self._get_policy(queue, X)
132+
# Not supported with spmd policy so BasePCA must be specified
133+
policy = BasePCA._get_policy(BasePCA, queue, X)
132134
model = self._create_model()
133135
X_table = to_table(X, queue=queue)
134136
params = self._get_onedal_params(X_table, stage="predict")
135137

136-
result = self._get_backend(
137-
"decomposition", "dim_reduction", "infer", policy, params, model, X_table
138+
# Not supported with spmd policy so BasePCA must be specified
139+
result = BasePCA._get_backend(
140+
BasePCA,
141+
"decomposition",
142+
"dim_reduction",
143+
"infer",
144+
policy,
145+
params,
146+
model,
147+
X_table,
138148
)
139149
return from_table(result.transformed_data)
140150

151+
transform = predict
152+
141153

142154
class PCA(BasePCA):
143155

onedal/decomposition/tests/test_incremental_pca.py

+2-2
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@ def test_on_gold_data(queue, is_deterministic, whiten, num_blocks, dtype):
4040

4141
result = incpca.finalize_fit()
4242

43-
transformed_data = incpca.predict(X, queue=queue)
43+
transformed_data = incpca.transform(X, queue=queue)
4444

4545
expected_n_components_ = 2
4646
expected_components_ = np.array([[0.83849224, 0.54491354], [-0.54491354, 0.83849224]])
@@ -128,7 +128,7 @@ def test_on_random_data(
128128

129129
incpca.finalize_fit()
130130

131-
transformed_data = incpca.predict(X, queue=queue)
131+
transformed_data = incpca.transform(X, queue=queue)
132132
tol = 3e-3 if transformed_data.dtype == np.float32 else 2e-6
133133

134134
n_components = incpca.n_components_

onedal/linear_model/incremental_linear_model.py

+14-4
Original file line numberDiff line numberDiff line change
@@ -48,8 +48,12 @@ def __init__(self, fit_intercept=True, copy_X=False, algorithm="norm_eq"):
4848

4949
def _reset(self):
5050
self._need_to_finalize = False
51-
self._partial_result = self._get_backend(
52-
"linear_model", "regression", "partial_train_result"
51+
# Not supported with spmd policy so IncrementalLinearRegression must be specified
52+
self._partial_result = IncrementalLinearRegression._get_backend(
53+
IncrementalLinearRegression,
54+
"linear_model",
55+
"regression",
56+
"partial_train_result",
5357
)
5458

5559
def __getstate__(self):
@@ -84,10 +88,16 @@ def partial_fit(self, X, y, queue=None):
8488
self : object
8589
Returns the instance itself.
8690
"""
87-
module = self._get_backend("linear_model", "regression")
91+
# Not supported with spmd policy so IncrementalLinearRegression must be specified
92+
module = IncrementalLinearRegression._get_backend(
93+
IncrementalLinearRegression, "linear_model", "regression"
94+
)
8895

8996
self._queue = queue
90-
policy = self._get_policy(queue, X)
97+
# Not supported with spmd policy so IncrementalLinearRegression must be specified
98+
policy = IncrementalLinearRegression._get_policy(
99+
IncrementalLinearRegression, queue, X
100+
)
91101

92102
X, y = _check_X_y(
93103
X, y, dtype=[np.float64, np.float32], accept_2d_y=True, force_all_finite=False

onedal/spmd/basic_statistics/incremental_basic_statistics.py

+1-48
Original file line numberDiff line numberDiff line change
@@ -14,58 +14,11 @@
1414
# limitations under the License.
1515
# ==============================================================================
1616

17-
from daal4py.sklearn._utils import get_dtype
18-
1917
from ...basic_statistics import (
2018
IncrementalBasicStatistics as base_IncrementalBasicStatistics,
2119
)
22-
from ...datatypes import to_table
2320
from .._base import BaseEstimatorSPMD
2421

2522

2623
class IncrementalBasicStatistics(BaseEstimatorSPMD, base_IncrementalBasicStatistics):
27-
def _reset(self):
28-
self._need_to_finalize = False
29-
self._partial_result = super(base_IncrementalBasicStatistics, self)._get_backend(
30-
"basic_statistics", None, "partial_compute_result"
31-
)
32-
33-
def partial_fit(self, X, weights=None, queue=None):
34-
"""
35-
Computes partial data for basic statistics
36-
from data batch X and saves it to `_partial_result`.
37-
38-
Parameters
39-
----------
40-
X : array-like of shape (n_samples, n_features)
41-
Training data batch, where `n_samples` is the number of samples
42-
in the batch, and `n_features` is the number of features.
43-
44-
queue : dpctl.SyclQueue
45-
If not None, use this queue for computations.
46-
47-
Returns
48-
-------
49-
self : object
50-
Returns the instance itself.
51-
"""
52-
self._queue = queue
53-
policy = super(base_IncrementalBasicStatistics, self)._get_policy(queue, X)
54-
X_table, weights_table = to_table(X, weights, queue=queue)
55-
56-
if not hasattr(self, "_onedal_params"):
57-
self._onedal_params = self._get_onedal_params(False, dtype=X_table.dtype)
58-
59-
self._partial_result = super(base_IncrementalBasicStatistics, self)._get_backend(
60-
"basic_statistics",
61-
None,
62-
"partial_compute",
63-
policy,
64-
self._onedal_params,
65-
self._partial_result,
66-
X_table,
67-
weights_table,
68-
)
69-
70-
self._need_to_finalize = True
71-
return self
24+
pass

onedal/spmd/covariance/incremental_covariance.py

+1-58
Original file line numberDiff line numberDiff line change
@@ -14,70 +14,13 @@
1414
# limitations under the License.
1515
# ==============================================================================
1616

17-
import numpy as np
18-
19-
from daal4py.sklearn._utils import get_dtype
20-
2117
from ...covariance import (
2218
IncrementalEmpiricalCovariance as base_IncrementalEmpiricalCovariance,
2319
)
24-
from ...datatypes import to_table
25-
from ...utils import _check_array
2620
from .._base import BaseEstimatorSPMD
2721

2822

2923
class IncrementalEmpiricalCovariance(
3024
BaseEstimatorSPMD, base_IncrementalEmpiricalCovariance
3125
):
32-
def _reset(self):
33-
self._need_to_finalize = False
34-
self._partial_result = super(
35-
base_IncrementalEmpiricalCovariance, self
36-
)._get_backend("covariance", None, "partial_compute_result")
37-
38-
def partial_fit(self, X, y=None, queue=None):
39-
"""
40-
Computes partial data for the covariance matrix
41-
from data batch X and saves it to `_partial_result`.
42-
43-
Parameters
44-
----------
45-
X : array-like of shape (n_samples, n_features)
46-
Training data batch, where `n_samples` is the number of samples
47-
in the batch, and `n_features` is the number of features.
48-
49-
y : Ignored
50-
Not used, present for API consistency by convention.
51-
52-
queue : dpctl.SyclQueue
53-
If not None, use this queue for computations.
54-
55-
Returns
56-
-------
57-
self : object
58-
Returns the instance itself.
59-
"""
60-
X = _check_array(X, dtype=[np.float64, np.float32], ensure_2d=True)
61-
62-
self._queue = queue
63-
64-
policy = super(base_IncrementalEmpiricalCovariance, self)._get_policy(queue, X)
65-
66-
X_table = to_table(X, queue=queue)
67-
68-
if not hasattr(self, "_dtype"):
69-
self._dtype = X_table.dtype
70-
71-
params = self._get_onedal_params(self._dtype)
72-
self._partial_result = super(
73-
base_IncrementalEmpiricalCovariance, self
74-
)._get_backend(
75-
"covariance",
76-
None,
77-
"partial_compute",
78-
policy,
79-
params,
80-
self._partial_result,
81-
X_table,
82-
)
83-
self._need_to_finalize = True
26+
pass

0 commit comments

Comments
 (0)