From 61124579e3843e40c54c409e182bcdc3dea1dcdc Mon Sep 17 00:00:00 2001 From: Ethan Glaser Date: Wed, 8 Jan 2025 23:13:03 -0800 Subject: [PATCH 1/9] Refactor incremental spmd algos --- .../incremental_basic_statistics.py | 50 +-------- .../spmd/covariance/incremental_covariance.py | 60 +--------- onedal/spmd/decomposition/incremental_pca.py | 105 +----------------- .../linear_model/incremental_linear_model.py | 76 +------------ 4 files changed, 12 insertions(+), 279 deletions(-) diff --git a/onedal/spmd/basic_statistics/incremental_basic_statistics.py b/onedal/spmd/basic_statistics/incremental_basic_statistics.py index fe4e57b199..d475c1faa7 100644 --- a/onedal/spmd/basic_statistics/incremental_basic_statistics.py +++ b/onedal/spmd/basic_statistics/incremental_basic_statistics.py @@ -14,58 +14,14 @@ # limitations under the License. # ============================================================================== -from daal4py.sklearn._utils import get_dtype - from ...basic_statistics import ( IncrementalBasicStatistics as base_IncrementalBasicStatistics, ) -from ...datatypes import to_table +from ..._device_offload import support_input_format from .._base import BaseEstimatorSPMD class IncrementalBasicStatistics(BaseEstimatorSPMD, base_IncrementalBasicStatistics): - def _reset(self): - self._need_to_finalize = False - self._partial_result = super(base_IncrementalBasicStatistics, self)._get_backend( - "basic_statistics", None, "partial_compute_result" - ) - + @support_input_format() def partial_fit(self, X, weights=None, queue=None): - """ - Computes partial data for basic statistics - from data batch X and saves it to `_partial_result`. - - Parameters - ---------- - X : array-like of shape (n_samples, n_features) - Training data batch, where `n_samples` is the number of samples - in the batch, and `n_features` is the number of features. - - queue : dpctl.SyclQueue - If not None, use this queue for computations. - - Returns - ------- - self : object - Returns the instance itself. - """ - self._queue = queue - policy = super(base_IncrementalBasicStatistics, self)._get_policy(queue, X) - X_table, weights_table = to_table(X, weights, queue=queue) - - if not hasattr(self, "_onedal_params"): - self._onedal_params = self._get_onedal_params(False, dtype=X_table.dtype) - - self._partial_result = super(base_IncrementalBasicStatistics, self)._get_backend( - "basic_statistics", - None, - "partial_compute", - policy, - self._onedal_params, - self._partial_result, - X_table, - weights_table, - ) - - self._need_to_finalize = True - return self + return super().partial_fit(X, weights=weights, queue=queue) diff --git a/onedal/spmd/covariance/incremental_covariance.py b/onedal/spmd/covariance/incremental_covariance.py index f8d25b2a08..dd5381f3bd 100644 --- a/onedal/spmd/covariance/incremental_covariance.py +++ b/onedal/spmd/covariance/incremental_covariance.py @@ -14,70 +14,16 @@ # limitations under the License. # ============================================================================== -import numpy as np - -from daal4py.sklearn._utils import get_dtype - from ...covariance import ( IncrementalEmpiricalCovariance as base_IncrementalEmpiricalCovariance, ) -from ...datatypes import to_table -from ...utils import _check_array +from ..._device_offload import support_input_format from .._base import BaseEstimatorSPMD class IncrementalEmpiricalCovariance( BaseEstimatorSPMD, base_IncrementalEmpiricalCovariance ): - def _reset(self): - self._need_to_finalize = False - self._partial_result = super( - base_IncrementalEmpiricalCovariance, self - )._get_backend("covariance", None, "partial_compute_result") - + @support_input_format() def partial_fit(self, X, y=None, queue=None): - """ - Computes partial data for the covariance matrix - from data batch X and saves it to `_partial_result`. - - Parameters - ---------- - X : array-like of shape (n_samples, n_features) - Training data batch, where `n_samples` is the number of samples - in the batch, and `n_features` is the number of features. - - y : Ignored - Not used, present for API consistency by convention. - - queue : dpctl.SyclQueue - If not None, use this queue for computations. - - Returns - ------- - self : object - Returns the instance itself. - """ - X = _check_array(X, dtype=[np.float64, np.float32], ensure_2d=True) - - self._queue = queue - - policy = super(base_IncrementalEmpiricalCovariance, self)._get_policy(queue, X) - - X_table = to_table(X, queue=queue) - - if not hasattr(self, "_dtype"): - self._dtype = X_table.dtype - - params = self._get_onedal_params(self._dtype) - self._partial_result = super( - base_IncrementalEmpiricalCovariance, self - )._get_backend( - "covariance", - None, - "partial_compute", - policy, - params, - self._partial_result, - X_table, - ) - self._need_to_finalize = True + return super().partial_fit(X, queue=queue) diff --git a/onedal/spmd/decomposition/incremental_pca.py b/onedal/spmd/decomposition/incremental_pca.py index 76c3821d52..8708c26248 100644 --- a/onedal/spmd/decomposition/incremental_pca.py +++ b/onedal/spmd/decomposition/incremental_pca.py @@ -14,111 +14,12 @@ # limitations under the License. # ============================================================================== -from daal4py.sklearn._utils import get_dtype - -from ...datatypes import from_table, to_table from ...decomposition import IncrementalPCA as base_IncrementalPCA -from ...utils import _check_array +from ..._device_offload import support_input_format from .._base import BaseEstimatorSPMD class IncrementalPCA(BaseEstimatorSPMD, base_IncrementalPCA): - """ - Distributed incremental estimator for PCA based on oneDAL implementation. - Allows for distributed PCA computation if data is split into batches. - - API is the same as for `onedal.decomposition.IncrementalPCA` - """ - - def _reset(self): - self._need_to_finalize = False - self._partial_result = super(base_IncrementalPCA, self)._get_backend( - "decomposition", "dim_reduction", "partial_train_result" - ) - if hasattr(self, "components_"): - del self.components_ - + @support_input_format() def partial_fit(self, X, y=None, queue=None): - """Incremental fit with X. All of X is processed as a single batch. - - Parameters - ---------- - X : array-like of shape (n_samples, n_features) - Training data, where `n_samples` is the number of samples and - `n_features` is the number of features. - - y : Ignored - Not used, present for API consistency by convention. - - Returns - ------- - self : object - Returns the instance itself. - """ - X = _check_array(X) - n_samples, n_features = X.shape - - first_pass = not hasattr(self, "components_") - if first_pass: - self.components_ = None - self.n_samples_seen_ = n_samples - self.n_features_in_ = n_features - else: - self.n_samples_seen_ += n_samples - - if self.n_components is None: - if self.components_ is None: - self.n_components_ = min(n_samples, n_features) - else: - self.n_components_ = self.components_.shape[0] - else: - self.n_components_ = self.n_components - - self._queue = queue - - policy = super(base_IncrementalPCA, self)._get_policy(queue, X) - X_table = to_table(X, queue=queue) - - if not hasattr(self, "_dtype"): - self._dtype = X_table.dtype - self._params = self._get_onedal_params(X_table) - - self._partial_result = super(base_IncrementalPCA, self)._get_backend( - "decomposition", - "dim_reduction", - "partial_train", - policy, - self._params, - self._partial_result, - X_table, - ) - self._need_to_finalize = True - return self - - def _create_model(self): - m = super(base_IncrementalPCA, self)._get_backend( - "decomposition", "dim_reduction", "model" - ) - m.eigenvectors = to_table(self.components_) - m.means = to_table(self.mean_) - if self.whiten: - m.eigenvalues = to_table(self.explained_variance_) - self._onedal_model = m - return m - - def predict(self, X, queue=None): - policy = super(base_IncrementalPCA, self)._get_policy(queue, X) - model = self._create_model() - X = to_table(X, queue=queue) - params = self._get_onedal_params(X, stage="predict") - - result = super(base_IncrementalPCA, self)._get_backend( - "decomposition", - "dim_reduction", - "infer", - policy, - params, - model, - X, - ) - return from_table(result.transformed_data) + return super().partial_fit(X, queue=queue) diff --git a/onedal/spmd/linear_model/incremental_linear_model.py b/onedal/spmd/linear_model/incremental_linear_model.py index d3846bc82a..6b0f205e6d 100644 --- a/onedal/spmd/linear_model/incremental_linear_model.py +++ b/onedal/spmd/linear_model/incremental_linear_model.py @@ -14,84 +14,14 @@ # limitations under the License. # ============================================================================== -import numpy as np - -from daal4py.sklearn._utils import get_dtype - -from ...common.hyperparameters import get_hyperparameters -from ...datatypes import to_table from ...linear_model import ( IncrementalLinearRegression as base_IncrementalLinearRegression, ) -from ...utils import _check_X_y, _num_features +from ..._device_offload import support_input_format from .._base import BaseEstimatorSPMD class IncrementalLinearRegression(BaseEstimatorSPMD, base_IncrementalLinearRegression): - """ - Distributed incremental Linear Regression oneDAL implementation. - - API is the same as for `onedal.linear_model.IncrementalLinearRegression`. - """ - - def _reset(self): - self._partial_result = super(base_IncrementalLinearRegression, self)._get_backend( - "linear_model", "regression", "partial_train_result" - ) - + @support_input_format() def partial_fit(self, X, y, queue=None): - """ - Computes partial data for linear regression - from data batch X and saves it to `_partial_result`. - Parameters - ---------- - X : array-like of shape (n_samples, n_features) - Training data batch, where `n_samples` is the number of samples - in the batch, and `n_features` is the number of features. - - y: array-like of shape (n_samples,) or (n_samples, n_targets) in - case of multiple targets - Responses for training data. - - queue : dpctl.SyclQueue - If not None, use this queue for computations. - Returns - ------- - self : object - Returns the instance itself. - """ - module = super(base_IncrementalLinearRegression, self)._get_backend( - "linear_model", "regression" - ) - - self._queue = queue - policy = super(base_IncrementalLinearRegression, self)._get_policy(queue, X) - - X, y = _check_X_y( - X, y, dtype=[np.float64, np.float32], accept_2d_y=True, force_all_finite=False - ) - - X_table, y_table = to_table(X, y, queue=queue) - - if not hasattr(self, "_dtype"): - self._dtype = X_table.dtype - self._params = self._get_onedal_params(self._dtype) - - y = np.asarray(y, dtype=self._dtype) - - self.n_features_in_ = _num_features(X, fallback_1d=True) - - hparams = get_hyperparameters("linear_regression", "train") - if hparams is not None and not hparams.is_default: - self._partial_result = module.partial_train( - policy, - self._params, - hparams.backend, - self._partial_result, - X_table, - y_table, - ) - else: - self._partial_result = module.partial_train( - policy, self._params, self._partial_result, X_table, y_table - ) + return super().partial_fit(X, y, queue=queue) From 9c72d9c89ee4ec92b3581cc7bc2a8b168173678c Mon Sep 17 00:00:00 2001 From: Ethan Glaser Date: Fri, 10 Jan 2025 11:42:53 -0800 Subject: [PATCH 2/9] Clear spmd impls, specify non-spmd get_policy in base cls --- onedal/basic_statistics/incremental_basic_statistics.py | 9 +++++---- onedal/covariance/incremental_covariance.py | 9 +++++---- onedal/decomposition/incremental_pca.py | 7 ++++--- onedal/linear_model/incremental_linear_model.py | 8 ++++---- .../basic_statistics/incremental_basic_statistics.py | 5 +---- onedal/spmd/covariance/incremental_covariance.py | 5 +---- onedal/spmd/decomposition/incremental_pca.py | 5 +---- onedal/spmd/linear_model/incremental_linear_model.py | 5 +---- 8 files changed, 22 insertions(+), 31 deletions(-) diff --git a/onedal/basic_statistics/incremental_basic_statistics.py b/onedal/basic_statistics/incremental_basic_statistics.py index b98161ce59..d41da916de 100644 --- a/onedal/basic_statistics/incremental_basic_statistics.py +++ b/onedal/basic_statistics/incremental_basic_statistics.py @@ -71,8 +71,8 @@ def __init__(self, result_options="all"): def _reset(self): self._need_to_finalize = False - self._partial_result = self._get_backend( - "basic_statistics", None, "partial_compute_result" + self._partial_result = IncrementalBasicStatistics._get_backend( + IncrementalBasicStatistics, "basic_statistics", None, "partial_compute_result" ) def __getstate__(self): @@ -105,7 +105,7 @@ def partial_fit(self, X, weights=None, queue=None): Returns the instance itself. """ self._queue = queue - policy = self._get_policy(queue, X) + policy = IncrementalBasicStatistics._get_policy(IncrementalBasicStatistics, queue, X) X = _check_array( X, dtype=[np.float64, np.float32], ensure_2d=False, force_all_finite=False @@ -123,7 +123,8 @@ def partial_fit(self, X, weights=None, queue=None): self._onedal_params = self._get_onedal_params(False, dtype=dtype) X_table, weights_table = to_table(X, weights, queue=queue) - self._partial_result = self._get_backend( + IncrementalBasicStatistics._partial_result = self._get_backend( + IncrementalBasicStatistics, "basic_statistics", None, "partial_compute", diff --git a/onedal/covariance/incremental_covariance.py b/onedal/covariance/incremental_covariance.py index b0bfb04e22..c05632ebb0 100644 --- a/onedal/covariance/incremental_covariance.py +++ b/onedal/covariance/incremental_covariance.py @@ -58,8 +58,8 @@ def __init__(self, method="dense", bias=False, assume_centered=False): def _reset(self): self._need_to_finalize = False - self._partial_result = self._get_backend( - "covariance", None, "partial_compute_result" + self._partial_result = IncrementalEmpiricalCovariance._get_backend( + IncrementalEmpiricalCovariance, "covariance", None, "partial_compute_result" ) def __getstate__(self): @@ -99,7 +99,7 @@ def partial_fit(self, X, y=None, queue=None): self._queue = queue - policy = self._get_policy(queue, X) + policy = IncrementalEmpiricalCovariance._get_policy(IncrementalEmpiricalCovariance, queue, X) X_table = to_table(X, queue=queue) @@ -107,7 +107,8 @@ def partial_fit(self, X, y=None, queue=None): self._dtype = X_table.dtype params = self._get_onedal_params(self._dtype) - self._partial_result = self._get_backend( + self._partial_result = IncrementalEmpiricalCovariance._get_backend( + IncrementalEmpiricalCovariance, "covariance", None, "partial_compute", diff --git a/onedal/decomposition/incremental_pca.py b/onedal/decomposition/incremental_pca.py index 58c852ed81..6591e7d892 100644 --- a/onedal/decomposition/incremental_pca.py +++ b/onedal/decomposition/incremental_pca.py @@ -100,7 +100,7 @@ def __init__( def _reset(self): self._need_to_finalize = False - module = self._get_backend("decomposition", "dim_reduction") + module = IncrementalPCA._get_backend(IncrementalPCA, "decomposition", "dim_reduction") if hasattr(self, "components_"): del self.components_ self._partial_result = module.partial_train_result() @@ -154,14 +154,15 @@ def partial_fit(self, X, queue): self._queue = queue - policy = self._get_policy(queue, X) + policy = IncrementalPCA._get_policy(IncrementalPCA, queue, X) X_table = to_table(X, queue=queue) if not hasattr(self, "_dtype"): self._dtype = X_table.dtype self._params = self._get_onedal_params(X_table) - self._partial_result = self._get_backend( + self._partial_result = IncrementalPCA._get_backend( + IncrementalPCA, "decomposition", "dim_reduction", "partial_train", diff --git a/onedal/linear_model/incremental_linear_model.py b/onedal/linear_model/incremental_linear_model.py index bc48d59077..fbafae94b0 100644 --- a/onedal/linear_model/incremental_linear_model.py +++ b/onedal/linear_model/incremental_linear_model.py @@ -47,8 +47,8 @@ def __init__(self, fit_intercept=True, copy_X=False, algorithm="norm_eq"): self._reset() def _reset(self): - self._partial_result = self._get_backend( - "linear_model", "regression", "partial_train_result" + self._partial_result = IncrementalLinearRegression._get_backend( + IncrementalLinearRegression, "linear_model", "regression", "partial_train_result" ) def partial_fit(self, X, y, queue=None): @@ -72,10 +72,10 @@ def partial_fit(self, X, y, queue=None): self : object Returns the instance itself. """ - module = self._get_backend("linear_model", "regression") + module = IncrementalLinearRegression._get_backend(IncrementalLinearRegression, "linear_model", "regression") self._queue = queue - policy = self._get_policy(queue, X) + policy = IncrementalLinearRegression._get_policy(IncrementalLinearRegression, queue, X) X, y = _check_X_y( X, y, dtype=[np.float64, np.float32], accept_2d_y=True, force_all_finite=False diff --git a/onedal/spmd/basic_statistics/incremental_basic_statistics.py b/onedal/spmd/basic_statistics/incremental_basic_statistics.py index d475c1faa7..14c90cf3b4 100644 --- a/onedal/spmd/basic_statistics/incremental_basic_statistics.py +++ b/onedal/spmd/basic_statistics/incremental_basic_statistics.py @@ -17,11 +17,8 @@ from ...basic_statistics import ( IncrementalBasicStatistics as base_IncrementalBasicStatistics, ) -from ..._device_offload import support_input_format from .._base import BaseEstimatorSPMD class IncrementalBasicStatistics(BaseEstimatorSPMD, base_IncrementalBasicStatistics): - @support_input_format() - def partial_fit(self, X, weights=None, queue=None): - return super().partial_fit(X, weights=weights, queue=queue) + pass diff --git a/onedal/spmd/covariance/incremental_covariance.py b/onedal/spmd/covariance/incremental_covariance.py index dd5381f3bd..749d208467 100644 --- a/onedal/spmd/covariance/incremental_covariance.py +++ b/onedal/spmd/covariance/incremental_covariance.py @@ -17,13 +17,10 @@ from ...covariance import ( IncrementalEmpiricalCovariance as base_IncrementalEmpiricalCovariance, ) -from ..._device_offload import support_input_format from .._base import BaseEstimatorSPMD class IncrementalEmpiricalCovariance( BaseEstimatorSPMD, base_IncrementalEmpiricalCovariance ): - @support_input_format() - def partial_fit(self, X, y=None, queue=None): - return super().partial_fit(X, queue=queue) + pass diff --git a/onedal/spmd/decomposition/incremental_pca.py b/onedal/spmd/decomposition/incremental_pca.py index 8708c26248..0f5afb32af 100644 --- a/onedal/spmd/decomposition/incremental_pca.py +++ b/onedal/spmd/decomposition/incremental_pca.py @@ -15,11 +15,8 @@ # ============================================================================== from ...decomposition import IncrementalPCA as base_IncrementalPCA -from ..._device_offload import support_input_format from .._base import BaseEstimatorSPMD class IncrementalPCA(BaseEstimatorSPMD, base_IncrementalPCA): - @support_input_format() - def partial_fit(self, X, y=None, queue=None): - return super().partial_fit(X, queue=queue) + pass diff --git a/onedal/spmd/linear_model/incremental_linear_model.py b/onedal/spmd/linear_model/incremental_linear_model.py index 6b0f205e6d..19dfd65603 100644 --- a/onedal/spmd/linear_model/incremental_linear_model.py +++ b/onedal/spmd/linear_model/incremental_linear_model.py @@ -17,11 +17,8 @@ from ...linear_model import ( IncrementalLinearRegression as base_IncrementalLinearRegression, ) -from ..._device_offload import support_input_format from .._base import BaseEstimatorSPMD class IncrementalLinearRegression(BaseEstimatorSPMD, base_IncrementalLinearRegression): - @support_input_format() - def partial_fit(self, X, y, queue=None): - return super().partial_fit(X, y, queue=queue) + pass From e455c56a8b15e36318e640cfa62a32eb707e9d8f Mon Sep 17 00:00:00 2001 From: Ethan Glaser Date: Fri, 10 Jan 2025 11:46:16 -0800 Subject: [PATCH 3/9] black --- .../incremental_basic_statistics.py | 4 +++- onedal/covariance/incremental_covariance.py | 4 +++- onedal/decomposition/incremental_pca.py | 4 +++- onedal/linear_model/incremental_linear_model.py | 13 ++++++++++--- 4 files changed, 19 insertions(+), 6 deletions(-) diff --git a/onedal/basic_statistics/incremental_basic_statistics.py b/onedal/basic_statistics/incremental_basic_statistics.py index d41da916de..f860adc602 100644 --- a/onedal/basic_statistics/incremental_basic_statistics.py +++ b/onedal/basic_statistics/incremental_basic_statistics.py @@ -105,7 +105,9 @@ def partial_fit(self, X, weights=None, queue=None): Returns the instance itself. """ self._queue = queue - policy = IncrementalBasicStatistics._get_policy(IncrementalBasicStatistics, queue, X) + policy = IncrementalBasicStatistics._get_policy( + IncrementalBasicStatistics, queue, X + ) X = _check_array( X, dtype=[np.float64, np.float32], ensure_2d=False, force_all_finite=False diff --git a/onedal/covariance/incremental_covariance.py b/onedal/covariance/incremental_covariance.py index c05632ebb0..99d2fa34e3 100644 --- a/onedal/covariance/incremental_covariance.py +++ b/onedal/covariance/incremental_covariance.py @@ -99,7 +99,9 @@ def partial_fit(self, X, y=None, queue=None): self._queue = queue - policy = IncrementalEmpiricalCovariance._get_policy(IncrementalEmpiricalCovariance, queue, X) + policy = IncrementalEmpiricalCovariance._get_policy( + IncrementalEmpiricalCovariance, queue, X + ) X_table = to_table(X, queue=queue) diff --git a/onedal/decomposition/incremental_pca.py b/onedal/decomposition/incremental_pca.py index 6591e7d892..03cb434a0a 100644 --- a/onedal/decomposition/incremental_pca.py +++ b/onedal/decomposition/incremental_pca.py @@ -100,7 +100,9 @@ def __init__( def _reset(self): self._need_to_finalize = False - module = IncrementalPCA._get_backend(IncrementalPCA, "decomposition", "dim_reduction") + module = IncrementalPCA._get_backend( + IncrementalPCA, "decomposition", "dim_reduction" + ) if hasattr(self, "components_"): del self.components_ self._partial_result = module.partial_train_result() diff --git a/onedal/linear_model/incremental_linear_model.py b/onedal/linear_model/incremental_linear_model.py index fbafae94b0..218debc810 100644 --- a/onedal/linear_model/incremental_linear_model.py +++ b/onedal/linear_model/incremental_linear_model.py @@ -48,7 +48,10 @@ def __init__(self, fit_intercept=True, copy_X=False, algorithm="norm_eq"): def _reset(self): self._partial_result = IncrementalLinearRegression._get_backend( - IncrementalLinearRegression, "linear_model", "regression", "partial_train_result" + IncrementalLinearRegression, + "linear_model", + "regression", + "partial_train_result", ) def partial_fit(self, X, y, queue=None): @@ -72,10 +75,14 @@ def partial_fit(self, X, y, queue=None): self : object Returns the instance itself. """ - module = IncrementalLinearRegression._get_backend(IncrementalLinearRegression, "linear_model", "regression") + module = IncrementalLinearRegression._get_backend( + IncrementalLinearRegression, "linear_model", "regression" + ) self._queue = queue - policy = IncrementalLinearRegression._get_policy(IncrementalLinearRegression, queue, X) + policy = IncrementalLinearRegression._get_policy( + IncrementalLinearRegression, queue, X + ) X, y = _check_X_y( X, y, dtype=[np.float64, np.float32], accept_2d_y=True, force_all_finite=False From 572bae5ab65993d4a2f8fd6944527648ee40fdc2 Mon Sep 17 00:00:00 2001 From: Ethan Glaser Date: Fri, 10 Jan 2025 13:21:09 -0800 Subject: [PATCH 4/9] minor bs fix --- onedal/basic_statistics/incremental_basic_statistics.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/onedal/basic_statistics/incremental_basic_statistics.py b/onedal/basic_statistics/incremental_basic_statistics.py index f860adc602..f430e30334 100644 --- a/onedal/basic_statistics/incremental_basic_statistics.py +++ b/onedal/basic_statistics/incremental_basic_statistics.py @@ -125,7 +125,7 @@ def partial_fit(self, X, weights=None, queue=None): self._onedal_params = self._get_onedal_params(False, dtype=dtype) X_table, weights_table = to_table(X, weights, queue=queue) - IncrementalBasicStatistics._partial_result = self._get_backend( + self._partial_result = IncrementalBasicStatistics._get_backend( IncrementalBasicStatistics, "basic_statistics", None, From da5c27c61d54e66a7f9b5e2461c72f2b0440390a Mon Sep 17 00:00:00 2001 From: Ethan Glaser Date: Tue, 14 Jan 2025 14:14:02 -0800 Subject: [PATCH 5/9] apply changes to PCA predict and add transform --- onedal/decomposition/pca.py | 11 +++++++---- onedal/decomposition/tests/test_incremental_pca.py | 4 ++-- 2 files changed, 9 insertions(+), 6 deletions(-) diff --git a/onedal/decomposition/pca.py b/onedal/decomposition/pca.py index fe6f585ba5..4abf570aae 100644 --- a/onedal/decomposition/pca.py +++ b/onedal/decomposition/pca.py @@ -119,7 +119,7 @@ def _compute_noise_variance(self, n_components, n_sf_min): return 0.0 def _create_model(self): - m = self._get_backend("decomposition", "dim_reduction", "model") + m = BasePCA._get_backend(BasePCA, "decomposition", "dim_reduction", "model") m.eigenvectors = to_table(self.components_) m.means = to_table(self.mean_) if self.whiten: @@ -128,15 +128,18 @@ def _create_model(self): return m def predict(self, X, queue=None): - policy = self._get_policy(queue, X) + policy = BasePCA._get_policy(BasePCA, queue, X) model = self._create_model() X_table = to_table(X, queue=queue) params = self._get_onedal_params(X_table, stage="predict") - result = self._get_backend( - "decomposition", "dim_reduction", "infer", policy, params, model, X_table + result = BasePCA._get_backend( + BasePCA, "decomposition", "dim_reduction", "infer", policy, params, model, X_table ) return from_table(result.transformed_data) + + def transform(self, X, queue=None): + return self.predict(X, queue=queue) class PCA(BasePCA): diff --git a/onedal/decomposition/tests/test_incremental_pca.py b/onedal/decomposition/tests/test_incremental_pca.py index f427f74fd4..94a0b1acc6 100644 --- a/onedal/decomposition/tests/test_incremental_pca.py +++ b/onedal/decomposition/tests/test_incremental_pca.py @@ -40,7 +40,7 @@ def test_on_gold_data(queue, is_deterministic, whiten, num_blocks, dtype): result = incpca.finalize_fit() - transformed_data = incpca.predict(X, queue=queue) + transformed_data = incpca.transform(X, queue=queue) expected_n_components_ = 2 expected_components_ = np.array([[0.83849224, 0.54491354], [-0.54491354, 0.83849224]]) @@ -128,7 +128,7 @@ def test_on_random_data( incpca.finalize_fit() - transformed_data = incpca.predict(X, queue=queue) + transformed_data = incpca.transform(X, queue=queue) tol = 3e-3 if transformed_data.dtype == np.float32 else 2e-6 n_components = incpca.n_components_ From 6fdcbaabf7eaf7ba4b3558cf5295b9049f920545 Mon Sep 17 00:00:00 2001 From: Ethan Glaser Date: Tue, 14 Jan 2025 14:25:05 -0800 Subject: [PATCH 6/9] add comments --- onedal/basic_statistics/incremental_basic_statistics.py | 3 +++ onedal/covariance/incremental_covariance.py | 3 +++ onedal/decomposition/incremental_pca.py | 3 +++ onedal/decomposition/pca.py | 3 +++ onedal/linear_model/incremental_linear_model.py | 3 +++ 5 files changed, 15 insertions(+) diff --git a/onedal/basic_statistics/incremental_basic_statistics.py b/onedal/basic_statistics/incremental_basic_statistics.py index f430e30334..552d3b4c4a 100644 --- a/onedal/basic_statistics/incremental_basic_statistics.py +++ b/onedal/basic_statistics/incremental_basic_statistics.py @@ -71,6 +71,7 @@ def __init__(self, result_options="all"): def _reset(self): self._need_to_finalize = False + # Not supported with spmd policy so IncrementalBasicStatistics must be specified self._partial_result = IncrementalBasicStatistics._get_backend( IncrementalBasicStatistics, "basic_statistics", None, "partial_compute_result" ) @@ -105,6 +106,7 @@ def partial_fit(self, X, weights=None, queue=None): Returns the instance itself. """ self._queue = queue + # Not supported with spmd policy so IncrementalBasicStatistics must be specified policy = IncrementalBasicStatistics._get_policy( IncrementalBasicStatistics, queue, X ) @@ -125,6 +127,7 @@ def partial_fit(self, X, weights=None, queue=None): self._onedal_params = self._get_onedal_params(False, dtype=dtype) X_table, weights_table = to_table(X, weights, queue=queue) + # Not supported with spmd policy so IncrementalBasicStatistics must be specified self._partial_result = IncrementalBasicStatistics._get_backend( IncrementalBasicStatistics, "basic_statistics", diff --git a/onedal/covariance/incremental_covariance.py b/onedal/covariance/incremental_covariance.py index 99d2fa34e3..99ab6aa421 100644 --- a/onedal/covariance/incremental_covariance.py +++ b/onedal/covariance/incremental_covariance.py @@ -58,6 +58,7 @@ def __init__(self, method="dense", bias=False, assume_centered=False): def _reset(self): self._need_to_finalize = False + # Not supported with spmd policy so IncrementalEmpiricalCovariance must be specified self._partial_result = IncrementalEmpiricalCovariance._get_backend( IncrementalEmpiricalCovariance, "covariance", None, "partial_compute_result" ) @@ -99,6 +100,7 @@ def partial_fit(self, X, y=None, queue=None): self._queue = queue + # Not supported with spmd policy so IncrementalEmpiricalCovariance must be specified policy = IncrementalEmpiricalCovariance._get_policy( IncrementalEmpiricalCovariance, queue, X ) @@ -109,6 +111,7 @@ def partial_fit(self, X, y=None, queue=None): self._dtype = X_table.dtype params = self._get_onedal_params(self._dtype) + # Not supported with spmd policy so IncrementalEmpiricalCovariance must be specified self._partial_result = IncrementalEmpiricalCovariance._get_backend( IncrementalEmpiricalCovariance, "covariance", diff --git a/onedal/decomposition/incremental_pca.py b/onedal/decomposition/incremental_pca.py index 03cb434a0a..367ef14ec7 100644 --- a/onedal/decomposition/incremental_pca.py +++ b/onedal/decomposition/incremental_pca.py @@ -100,6 +100,7 @@ def __init__( def _reset(self): self._need_to_finalize = False + # Not supported with spmd policy so IncrementalPCA must be specified module = IncrementalPCA._get_backend( IncrementalPCA, "decomposition", "dim_reduction" ) @@ -156,6 +157,7 @@ def partial_fit(self, X, queue): self._queue = queue + # Not supported with spmd policy so IncrementalPCA must be specified policy = IncrementalPCA._get_policy(IncrementalPCA, queue, X) X_table = to_table(X, queue=queue) @@ -163,6 +165,7 @@ def partial_fit(self, X, queue): self._dtype = X_table.dtype self._params = self._get_onedal_params(X_table) + # Not supported with spmd policy so IncrementalPCA must be specified self._partial_result = IncrementalPCA._get_backend( IncrementalPCA, "decomposition", diff --git a/onedal/decomposition/pca.py b/onedal/decomposition/pca.py index 4abf570aae..3cae56a6ed 100644 --- a/onedal/decomposition/pca.py +++ b/onedal/decomposition/pca.py @@ -119,6 +119,7 @@ def _compute_noise_variance(self, n_components, n_sf_min): return 0.0 def _create_model(self): + # Not supported with spmd policy so BasePCA must be specified m = BasePCA._get_backend(BasePCA, "decomposition", "dim_reduction", "model") m.eigenvectors = to_table(self.components_) m.means = to_table(self.mean_) @@ -128,11 +129,13 @@ def _create_model(self): return m def predict(self, X, queue=None): + # Not supported with spmd policy so BasePCA must be specified policy = BasePCA._get_policy(BasePCA, queue, X) model = self._create_model() X_table = to_table(X, queue=queue) params = self._get_onedal_params(X_table, stage="predict") + # Not supported with spmd policy so BasePCA must be specified result = BasePCA._get_backend( BasePCA, "decomposition", "dim_reduction", "infer", policy, params, model, X_table ) diff --git a/onedal/linear_model/incremental_linear_model.py b/onedal/linear_model/incremental_linear_model.py index 218debc810..135c8826a7 100644 --- a/onedal/linear_model/incremental_linear_model.py +++ b/onedal/linear_model/incremental_linear_model.py @@ -47,6 +47,7 @@ def __init__(self, fit_intercept=True, copy_X=False, algorithm="norm_eq"): self._reset() def _reset(self): + # Not supported with spmd policy so IncrementalLinearRegression must be specified self._partial_result = IncrementalLinearRegression._get_backend( IncrementalLinearRegression, "linear_model", @@ -75,11 +76,13 @@ def partial_fit(self, X, y, queue=None): self : object Returns the instance itself. """ + # Not supported with spmd policy so IncrementalLinearRegression must be specified module = IncrementalLinearRegression._get_backend( IncrementalLinearRegression, "linear_model", "regression" ) self._queue = queue + # Not supported with spmd policy so IncrementalLinearRegression must be specified policy = IncrementalLinearRegression._get_policy( IncrementalLinearRegression, queue, X ) From 28c4eb565d3fc6a21e57f79c4d019fb06b857598 Mon Sep 17 00:00:00 2001 From: Ethan Glaser Date: Tue, 21 Jan 2025 15:38:57 -0800 Subject: [PATCH 7/9] tuple indices safeguarding --- onedal/_device_offload.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/onedal/_device_offload.py b/onedal/_device_offload.py index 4e46592bb2..46bd928a34 100644 --- a/onedal/_device_offload.py +++ b/onedal/_device_offload.py @@ -92,7 +92,7 @@ def _transfer_to_host(queue, *data): buffer = as_usm_memory(item).copy_to_host() order = "C" - if usm_iface["strides"] is not None: + if usm_iface["strides"] is not None and len(usm_iface["strides"]) > 1: if usm_iface["strides"][0] < usm_iface["strides"][1]: order = "F" item = np.ndarray( From bc147eba4f036cbca2b9c724cf25b2e277f1a782 Mon Sep 17 00:00:00 2001 From: Ethan Glaser Date: Tue, 21 Jan 2025 16:06:21 -0800 Subject: [PATCH 8/9] incremental bs fit fixes --- .../incremental_basic_statistics.py | 18 ++++++++++-------- 1 file changed, 10 insertions(+), 8 deletions(-) diff --git a/sklearnex/basic_statistics/incremental_basic_statistics.py b/sklearnex/basic_statistics/incremental_basic_statistics.py index d1ddcd55dc..29f5146bbf 100644 --- a/sklearnex/basic_statistics/incremental_basic_statistics.py +++ b/sklearnex/basic_statistics/incremental_basic_statistics.py @@ -228,16 +228,18 @@ def _onedal_partial_fit(self, X, sample_weight=None, queue=None, check_input=Tru self._need_to_finalize = True def _onedal_fit(self, X, sample_weight=None, queue=None): - if sklearn_check_version("1.2"): - self._validate_params() + use_raw_input = get_config()["use_raw_input"] + if not use_raw_input: + if sklearn_check_version("1.2"): + self._validate_params() - if sklearn_check_version("1.0"): - X = validate_data(self, X, dtype=[np.float64, np.float32]) - else: - X = check_array(X, dtype=[np.float64, np.float32]) + if sklearn_check_version("1.0"): + X = validate_data(self, X, dtype=[np.float64, np.float32]) + else: + X = check_array(X, dtype=[np.float64, np.float32]) - if sample_weight is not None: - sample_weight = _check_sample_weight(sample_weight, X) + if sample_weight is not None: + sample_weight = _check_sample_weight(sample_weight, X) n_samples, n_features = X.shape if self.batch_size is None: From 8559c5f1cdbebffb4aa3429cb41a2a536a7659cb Mon Sep 17 00:00:00 2001 From: Ethan Glaser Date: Tue, 21 Jan 2025 16:17:40 -0800 Subject: [PATCH 9/9] restore previous 2, added to raw inputs instead --- onedal/_device_offload.py | 2 +- .../incremental_basic_statistics.py | 18 ++++++++---------- 2 files changed, 9 insertions(+), 11 deletions(-) diff --git a/onedal/_device_offload.py b/onedal/_device_offload.py index 46bd928a34..4e46592bb2 100644 --- a/onedal/_device_offload.py +++ b/onedal/_device_offload.py @@ -92,7 +92,7 @@ def _transfer_to_host(queue, *data): buffer = as_usm_memory(item).copy_to_host() order = "C" - if usm_iface["strides"] is not None and len(usm_iface["strides"]) > 1: + if usm_iface["strides"] is not None: if usm_iface["strides"][0] < usm_iface["strides"][1]: order = "F" item = np.ndarray( diff --git a/sklearnex/basic_statistics/incremental_basic_statistics.py b/sklearnex/basic_statistics/incremental_basic_statistics.py index 29f5146bbf..d1ddcd55dc 100644 --- a/sklearnex/basic_statistics/incremental_basic_statistics.py +++ b/sklearnex/basic_statistics/incremental_basic_statistics.py @@ -228,18 +228,16 @@ def _onedal_partial_fit(self, X, sample_weight=None, queue=None, check_input=Tru self._need_to_finalize = True def _onedal_fit(self, X, sample_weight=None, queue=None): - use_raw_input = get_config()["use_raw_input"] - if not use_raw_input: - if sklearn_check_version("1.2"): - self._validate_params() + if sklearn_check_version("1.2"): + self._validate_params() - if sklearn_check_version("1.0"): - X = validate_data(self, X, dtype=[np.float64, np.float32]) - else: - X = check_array(X, dtype=[np.float64, np.float32]) + if sklearn_check_version("1.0"): + X = validate_data(self, X, dtype=[np.float64, np.float32]) + else: + X = check_array(X, dtype=[np.float64, np.float32]) - if sample_weight is not None: - sample_weight = _check_sample_weight(sample_weight, X) + if sample_weight is not None: + sample_weight = _check_sample_weight(sample_weight, X) n_samples, n_features = X.shape if self.batch_size is None: