From b9e5975c65b41c57eef69c8645eea34d3fb7d53d Mon Sep 17 00:00:00 2001 From: Charles Blackmon-Luca <20627856+charlesbluca@users.noreply.github.com> Date: Mon, 5 Dec 2022 12:24:27 -0800 Subject: [PATCH 1/4] Wrap ML functions with make_picklable_without_dask_sql --- dask_sql/physical/rel/custom/wrappers.py | 44 +++++++++++++++++------- 1 file changed, 31 insertions(+), 13 deletions(-) diff --git a/dask_sql/physical/rel/custom/wrappers.py b/dask_sql/physical/rel/custom/wrappers.py index c6432497b..97967c232 100644 --- a/dask_sql/physical/rel/custom/wrappers.py +++ b/dask_sql/physical/rel/custom/wrappers.py @@ -17,6 +17,8 @@ from sklearn.metrics import make_scorer from sklearn.utils.validation import check_is_fitted +from dask_sql.utils import make_pickable_without_dask_sql + try: import sklearn.base import sklearn.metrics @@ -198,17 +200,23 @@ def transform(self, X): if isinstance(X, da.Array): if output_meta is None: output_meta = _get_output_dask_ar_meta_for_estimator( - _transform, self._postfit_estimator, X + make_pickable_without_dask_sql(_transform), + self._postfit_estimator, + X, ) return X.map_blocks( - _transform, estimator=self._postfit_estimator, meta=output_meta + make_pickable_without_dask_sql(_transform), + estimator=self._postfit_estimator, + meta=output_meta, ) elif isinstance(X, dd._Frame): if output_meta is None: - output_meta = _transform(X._meta_nonempty, self._postfit_estimator) + output_meta = make_pickable_without_dask_sql(_transform)( + X._meta_nonempty, self._postfit_estimator + ) try: return X.map_partitions( - _transform, + make_pickable_without_dask_sql(_transform), self._postfit_estimator, output_meta, meta=output_meta, @@ -219,10 +227,14 @@ def transform(self, X): # for infering meta output_meta = dd.core.no_default return X.map_partitions( - _transform, estimator=self._postfit_estimator, meta=output_meta + make_pickable_without_dask_sql(_transform), + estimator=self._postfit_estimator, + meta=output_meta, ) else: - return _transform(X, estimator=self._postfit_estimator) + return make_pickable_without_dask_sql(_transform)( + X, estimator=self._postfit_estimator + ) def score(self, X, y, compute=True): """Returns the score on the given data. @@ -288,11 +300,11 @@ def predict(self, X): if isinstance(X, da.Array): if output_meta is None: output_meta = _get_output_dask_ar_meta_for_estimator( - _predict, self._postfit_estimator, X + make_pickable_without_dask_sql(_predict), self._postfit_estimator, X ) result = X.map_blocks( - _predict, + make_pickable_without_dask_sql(_predict), estimator=self._postfit_estimator, drop_axis=1, meta=output_meta, @@ -303,10 +315,12 @@ def predict(self, X): if output_meta is None: # dask-dataframe relies on dd.core.no_default # for infering meta - output_meta = _predict(X._meta_nonempty, self._postfit_estimator) + output_meta = make_pickable_without_dask_sql(_predict)( + X._meta_nonempty, self._postfit_estimator + ) try: return X.map_partitions( - _predict, + make_pickable_without_dask_sql(_predict), self._postfit_estimator, output_meta, meta=output_meta, @@ -315,10 +329,14 @@ def predict(self, X): if output_meta is None: output_meta = dd.core.no_default return X.map_partitions( - _predict, estimator=self._postfit_estimator, meta=output_meta + make_pickable_without_dask_sql(_predict), + estimator=self._postfit_estimator, + meta=output_meta, ) else: - return _predict(X, estimator=self._postfit_estimator) + return make_pickable_without_dask_sql(_predict)( + X, estimator=self._postfit_estimator + ) def predict_proba(self, X): """Probability estimates. @@ -762,7 +780,7 @@ def fit( dsk.update( { (name, i): ( - _partial_fit, + make_pickable_without_dask_sql(_partial_fit), (name, i - 1), (x_name, order[i]) + x_extra, (y_name, order[i]), From 51c73191a67a8749611c39d955386754c3d578b9 Mon Sep 17 00:00:00 2001 From: Charles Blackmon-Luca <20627856+charlesbluca@users.noreply.github.com> Date: Mon, 5 Dec 2022 12:28:56 -0800 Subject: [PATCH 2/4] Add skip_if_external_scheduler to relevant functions --- tests/unit/test_ml_wrappers.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/tests/unit/test_ml_wrappers.py b/tests/unit/test_ml_wrappers.py index 97277c1ad..0f02707b0 100644 --- a/tests/unit/test_ml_wrappers.py +++ b/tests/unit/test_ml_wrappers.py @@ -17,6 +17,8 @@ from dask_sql.physical.rel.custom.wrappers import Incremental, ParallelPostFit +from ..integration.fixtures import skip_if_external_scheduler + def _check_axis_partitioning(chunks, n_features): c = chunks[1][0] @@ -123,6 +125,7 @@ def assert_estimator_equal(left, right, exclude=None, **kwargs): _assert_eq(l, r, name=attr, **kwargs) +@skip_if_external_scheduler def test_parallelpostfit_basic(): clf = ParallelPostFit(GradientBoostingClassifier()) @@ -194,6 +197,7 @@ def test_transform(kind): assert_eq_ar(result, expected) +@skip_if_external_scheduler @pytest.mark.parametrize("dataframes", [False, True]) def test_incremental_basic(dataframes): # Create observations that we know linear models can recover From 6540327f925da45d53bf17bf0cb50993715c703b Mon Sep 17 00:00:00 2001 From: Charles Blackmon-Luca <20627856+charlesbluca@users.noreply.github.com> Date: Mon, 5 Dec 2022 12:47:57 -0800 Subject: [PATCH 3/4] Wrap functions with decorators --- dask_sql/physical/rel/custom/wrappers.py | 37 ++++++++++-------------- 1 file changed, 16 insertions(+), 21 deletions(-) diff --git a/dask_sql/physical/rel/custom/wrappers.py b/dask_sql/physical/rel/custom/wrappers.py index 97967c232..244d2ff42 100644 --- a/dask_sql/physical/rel/custom/wrappers.py +++ b/dask_sql/physical/rel/custom/wrappers.py @@ -200,23 +200,21 @@ def transform(self, X): if isinstance(X, da.Array): if output_meta is None: output_meta = _get_output_dask_ar_meta_for_estimator( - make_pickable_without_dask_sql(_transform), + _transform, self._postfit_estimator, X, ) return X.map_blocks( - make_pickable_without_dask_sql(_transform), + _transform, estimator=self._postfit_estimator, meta=output_meta, ) elif isinstance(X, dd._Frame): if output_meta is None: - output_meta = make_pickable_without_dask_sql(_transform)( - X._meta_nonempty, self._postfit_estimator - ) + output_meta = _transform(X._meta_nonempty, self._postfit_estimator) try: return X.map_partitions( - make_pickable_without_dask_sql(_transform), + _transform, self._postfit_estimator, output_meta, meta=output_meta, @@ -227,14 +225,12 @@ def transform(self, X): # for infering meta output_meta = dd.core.no_default return X.map_partitions( - make_pickable_without_dask_sql(_transform), + _transform, estimator=self._postfit_estimator, meta=output_meta, ) else: - return make_pickable_without_dask_sql(_transform)( - X, estimator=self._postfit_estimator - ) + return _transform(X, estimator=self._postfit_estimator) def score(self, X, y, compute=True): """Returns the score on the given data. @@ -300,11 +296,11 @@ def predict(self, X): if isinstance(X, da.Array): if output_meta is None: output_meta = _get_output_dask_ar_meta_for_estimator( - make_pickable_without_dask_sql(_predict), self._postfit_estimator, X + _predict, self._postfit_estimator, X ) result = X.map_blocks( - make_pickable_without_dask_sql(_predict), + _predict, estimator=self._postfit_estimator, drop_axis=1, meta=output_meta, @@ -315,12 +311,10 @@ def predict(self, X): if output_meta is None: # dask-dataframe relies on dd.core.no_default # for infering meta - output_meta = make_pickable_without_dask_sql(_predict)( - X._meta_nonempty, self._postfit_estimator - ) + output_meta = _predict(X._meta_nonempty, self._postfit_estimator) try: return X.map_partitions( - make_pickable_without_dask_sql(_predict), + _predict, self._postfit_estimator, output_meta, meta=output_meta, @@ -329,14 +323,12 @@ def predict(self, X): if output_meta is None: output_meta = dd.core.no_default return X.map_partitions( - make_pickable_without_dask_sql(_predict), + _predict, estimator=self._postfit_estimator, meta=output_meta, ) else: - return make_pickable_without_dask_sql(_predict)( - X, estimator=self._postfit_estimator - ) + return _predict(X, estimator=self._postfit_estimator) def predict_proba(self, X): """Probability estimates. @@ -571,6 +563,7 @@ def partial_fit(self, X, y=None, **fit_kwargs): return self._fit_for_estimator(estimator, X, y, **fit_kwargs) +@make_pickable_without_dask_sql def _predict(part, estimator, output_meta=None): if part.shape[0] == 0 and output_meta is not None: empty_output = handle_empty_partitions(output_meta) @@ -579,6 +572,7 @@ def _predict(part, estimator, output_meta=None): return estimator.predict(part) +@make_pickable_without_dask_sql def _predict_proba(part, estimator, output_meta=None): if part.shape[0] == 0 and output_meta is not None: empty_output = handle_empty_partitions(output_meta) @@ -587,6 +581,7 @@ def _predict_proba(part, estimator, output_meta=None): return estimator.predict_proba(part) +@make_pickable_without_dask_sql def _transform(part, estimator, output_meta=None): if part.shape[0] == 0 and output_meta is not None: empty_output = handle_empty_partitions(output_meta) @@ -780,7 +775,7 @@ def fit( dsk.update( { (name, i): ( - make_pickable_without_dask_sql(_partial_fit), + _partial_fit, (name, i - 1), (x_name, order[i]) + x_extra, (y_name, order[i]), From 8b75dd78446390e24315d88bdf7d2a1d10f45c40 Mon Sep 17 00:00:00 2001 From: Charles Blackmon-Luca <20627856+charlesbluca@users.noreply.github.com> Date: Mon, 5 Dec 2022 12:51:04 -0800 Subject: [PATCH 4/4] [test-upstream]