Skip to content

ENH make Random*Sampler accept dask array and dataframe #777

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 32 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
iter
  • Loading branch information
glemaitre committed Nov 6, 2020
commit 00c0a265f1930becf6a332195629dc9a2b917757
3 changes: 2 additions & 1 deletion imblearn/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,11 @@

from sklearn.base import BaseEstimator
from sklearn.preprocessing import label_binarize
from sklearn.utils.multiclass import check_classification_targets

from .utils import check_sampling_strategy, check_target_type
from .utils._validation import ArraysTransformer
from .utils._validation import _deprecate_positional_args
from .utils.wrapper import check_classification_targets


class SamplerMixin(BaseEstimator, metaclass=ABCMeta):
Expand Down Expand Up @@ -82,6 +82,7 @@ def fit_resample(self, X, y):

output = self._fit_resample(X, y)

# TODO: label binarize is not implemented with dask
y_ = (label_binarize(output[1], np.unique(y))
if binarize_y else output[1])

Expand Down
17 changes: 15 additions & 2 deletions imblearn/dask/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,10 @@ def is_multilabel(y):
if not (y.ndim == 2 and y.shape[1] > 1):
return False

labels = np.unique(y).compute()
if hasattr(y, "unique"):
labels = np.asarray(y.unique())
else:
labels = np.unique(y).compute()
Comment on lines +12 to +15

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've struggled with this check in dask-ml. Depending on where it's called, it's potentially very expensive (you might be loading a ton of data just to check if it's multi-label, and then loading it again to to the training).

Whenever possible, it's helpful to provide an option to skip this check by having the user specify it when creating the estimator, or in a keyword to fit (dunno if that applies here).

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I thought about it. Do you think that having a context manager outside would make sense:

with set_config(avoid_check=True):
    # some imblearn/scikit-learn/dask code

Thought, we might get into trouble with issues related to scikit-learn/scikit-learn#18736

It might just be easier to have an optional class parameter that applies only for dask arrays.


return len(labels) < 3 and (
y.dtype.kind in 'biu' or _is_integral_float(labels)
Expand Down Expand Up @@ -39,7 +42,10 @@ def type_of_target(y):
# NOTE: we don't check for infinite values
return 'continuous' + suffix

labels = np.unique(y).compute()
if hasattr(y, "unique"):
labels = np.asarray(y.unique())
else:
labels = np.unique(y).compute()
if (len((labels)) > 2) or (y.ndim >= 2 and len(y[0]) > 1):
# [1, 2, 3] or [[1., 2., 3]] or [[1, 2]]
return 'multiclass' + suffix
Expand All @@ -63,3 +69,10 @@ def column_or_1d(y, *, warn=False):
raise ValueError(
f"y should be a 1d array. Got an array of shape {shape} instead."
)


def check_classification_targets(y):
y_type = type_of_target(y)
if y_type not in ['binary', 'multiclass', 'multiclass-multioutput',
'multilabel-indicator', 'multilabel-sequences']:
raise ValueError("Unknown label type: %r" % y_type)
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,9 @@ def __init__(
self.replacement = replacement

def _check_X_y(self, X, y):
if is_dask_container(y) and hasattr(y, "to_dask_array"):
y = y.to_dask_array()
y.compute_chunk_sizes()

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In Dask-ML we (@stsievert I think? maybe me?) prefer to have the user do this: https://github.com/dask/dask-ml/blob/7e11ce1505a485104e02d49a3620c8264e63e12e/dask_ml/utils.py#L166-L173. If you're just fitting the one estimator then this is probably equivalent. If you're doing something like a cross_val_score, then I think this would end up loading data multiple times just to compute the chunk sizes.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is something that I was unsure of, here. If I recall, the issue was that I could not have called ravel on the Series and then the easiest way to always have an array and convert back to a Series reusing the meta-data.

However, if we assume that the checks are too expensive to be done in a distributive setting, we don't need to call the check below and we can directly pass the Series and handle it during the resampling.

So, we have fewer safeguards but at least it is more performant which is something you probably want in a distrubted setting

y, binarize_y, self._uniques = check_target_type(
y,
indicate_one_vs_all=True,
Expand All @@ -95,6 +98,9 @@ def _check_X_y(self, X, y):
dtype=None,
force_all_finite=False,
)
elif is_dask_container(X) and hasattr(X, "to_dask_array"):
X = X.to_dask_array()
X.compute_chunk_sizes()
return X, y, binarize_y

@staticmethod
Expand Down Expand Up @@ -140,7 +146,7 @@ def _more_tags(self):
"2darray",
"string",
"dask-array",
# "dask-dataframe"
"dask-dataframe"
],
"sample_indices": True,
"allow_nan": True,
Expand Down
28 changes: 26 additions & 2 deletions imblearn/utils/_validation.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,9 @@ def transform(self, X, y):
def _gets_props(self, array):
props = {}
props["type"] = array.__class__.__name__
if props["type"].lower() in ("series", "dataframe"):
suffix = "dask-" if is_dask_container(array) else "pandas-"
props["type"] = suffix + props["type"]
props["columns"] = getattr(array, "columns", None)
props["name"] = getattr(array, "name", None)
props["dtypes"] = getattr(array, "dtypes", None)
Expand All @@ -56,13 +59,34 @@ def _transfrom_one(self, array, props):
type_ = props["type"].lower()
if type_ == "list":
ret = array.tolist()
elif type_ == "dataframe":
elif type_ == "pandas-dataframe":
import pandas as pd

ret = pd.DataFrame(array, columns=props["columns"])
ret = ret.astype(props["dtypes"])
elif type_ == "series":
elif type_ == "pandas-series":
import pandas as pd

ret = pd.Series(array, dtype=props["dtypes"], name=props["name"])
elif type_ == "dask-dataframe":
from dask import dataframe

if is_dask_container(array):
ret = dataframe.from_dask_array(
array, columns=props["columns"]
)
else:
ret = dataframe.from_array(array, columns=props["columns"])
ret = ret.astype(props["dtypes"])
elif type_ == "dask-series":
from dask import dataframe

if is_dask_container(array):
ret = dataframe.from_dask_array(array)
else:
ret = dataframe.from_array(array)
ret = ret.astype(props["dtypes"])
ret = ret.rename(props["name"])
else:
ret = array
return ret
Expand Down
11 changes: 5 additions & 6 deletions imblearn/utils/estimator_checks.py
Original file line number Diff line number Diff line change
Expand Up @@ -333,6 +333,7 @@ def check_samplers_dask_dataframe(name, sampler):
X, columns=[str(i) for i in range(X.shape[1])]
)
y_s = dask.dataframe.from_array(y)
y_s = y_s.rename("target")

X_res_df, y_res_s = sampler.fit_resample(X_df, y_s)
X_res, y_res = sampler.fit_resample(X, y)
Expand All @@ -341,13 +342,11 @@ def check_samplers_dask_dataframe(name, sampler):
assert isinstance(X_res_df, dask.dataframe.DataFrame)
assert isinstance(y_res_s, dask.dataframe.Series)

# assert X_df.columns.to_list() == X_res_df.columns.to_list()
# assert y_df.columns.to_list() == y_res_df.columns.to_list()
# assert y_s.name == y_res_s.name
assert X_df.columns.to_list() == X_res_df.columns.to_list()
assert y_s.name == y_res_s.name

# assert_allclose(X_res_df.to_numpy(), X_res)
# assert_allclose(y_res_df.to_numpy().ravel(), y_res)
# assert_allclose(y_res_s.to_numpy(), y_res)
assert_allclose(np.array(X_res_df), X_res)
assert_allclose(np.array(y_res_s), y_res)


def check_samplers_list(name, sampler):
Expand Down
24 changes: 19 additions & 5 deletions imblearn/utils/wrapper.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
import numpy as np

from sklearn.utils.multiclass import check_classification_targets as \
sklearn_check_classification_targets
from sklearn.utils.multiclass import type_of_target as sklearn_type_of_target
from sklearn.utils.validation import column_or_1d as sklearn_column_or_1d

Expand Down Expand Up @@ -30,8 +32,20 @@ def column_or_1d(y, *, warn=False):
return sklearn_column_or_1d(y, warn=warn)


def unique(*args, **kwargs):
output = np.unique(args, kwargs)
if is_dask_container(output):
return (arr.compute() for arr in output)
return output
def unique(arr, **kwargs):
if is_dask_container(arr):
if hasattr(arr, "unique"):
output = np.asarray(arr.unique(**kwargs))
else:
output = np.unique(arr).compute()
return output
return np.unique(arr, **kwargs)


def check_classification_targets(y):
if is_dask_container(y):
from ..dask.utils import check_classification_targets as \
dask_check_classification_targets

return dask_check_classification_targets(y)
return sklearn_check_classification_targets(y)