Unverified Commit fc6b71e0 authored by James Lamb's avatar James Lamb Committed by GitHub
Browse files

[dask] Support Dask dataframes with 'category' columns (fixes #3861) (#3908)



* add support for pandas categorical columns

* remove commented code

* quotes

* syntax error

* fix shape for ranker test

* Apply suggestions from code review
Co-authored-by: default avatarNikita Titov <nekit94-08@mail.ru>

* Update tests/python_package_test/test_dask.py

* trying

* fix tests

* remove unnecessary debugging stuff

* skip accuracy checks on categorical

* use category columns as categorical features
Co-authored-by: default avatarNikita Titov <nekit94-08@mail.ru>
parent e31244cf
...@@ -350,13 +350,12 @@ def _predict_part( ...@@ -350,13 +350,12 @@ def _predict_part(
pred_contrib: bool, pred_contrib: bool,
**kwargs: Any **kwargs: Any
) -> _DaskPart: ) -> _DaskPart:
data = part.values if isinstance(part, pd_DataFrame) else part
if data.shape[0] == 0: if part.shape[0] == 0:
result = np.array([]) result = np.array([])
elif pred_proba: elif pred_proba:
result = model.predict_proba( result = model.predict_proba(
data, part,
raw_score=raw_score, raw_score=raw_score,
pred_leaf=pred_leaf, pred_leaf=pred_leaf,
pred_contrib=pred_contrib, pred_contrib=pred_contrib,
...@@ -364,13 +363,14 @@ def _predict_part( ...@@ -364,13 +363,14 @@ def _predict_part(
) )
else: else:
result = model.predict( result = model.predict(
data, part,
raw_score=raw_score, raw_score=raw_score,
pred_leaf=pred_leaf, pred_leaf=pred_leaf,
pred_contrib=pred_contrib, pred_contrib=pred_contrib,
**kwargs **kwargs
) )
# dask.DataFrame.map_partitions() expects each call to return a pandas DataFrame or Series
if isinstance(part, pd_DataFrame): if isinstance(part, pd_DataFrame):
if pred_proba or pred_contrib: if pred_proba or pred_contrib:
result = pd_DataFrame(result, index=part.index) result = pd_DataFrame(result, index=part.index)
......
...@@ -35,7 +35,7 @@ from .utils import make_ranking ...@@ -35,7 +35,7 @@ from .utils import make_ranking
# see https://distributed.dask.org/en/latest/api.html#distributed.Client.close # see https://distributed.dask.org/en/latest/api.html#distributed.Client.close
CLIENT_CLOSE_TIMEOUT = 120 CLIENT_CLOSE_TIMEOUT = 120
data_output = ['array', 'scipy_csr_matrix', 'dataframe'] data_output = ['array', 'scipy_csr_matrix', 'dataframe', 'dataframe-with-categorical']
data_centers = [[[-4, -4], [4, 4]], [[-4, -4], [4, 4], [-4, 4]]] data_centers = [[[-4, -4], [4, 4]], [[-4, -4], [4, 4], [-4, 4]]]
group_sizes = [5, 5, 5, 10, 10, 10, 20, 20, 20, 50, 50] group_sizes = [5, 5, 5, 10, 10, 10, 20, 20, 20, 50, 50]
...@@ -60,9 +60,18 @@ def _create_ranking_data(n_samples=100, output='array', chunk_size=50, **kwargs) ...@@ -60,9 +60,18 @@ def _create_ranking_data(n_samples=100, output='array', chunk_size=50, **kwargs)
w = rnd.rand(X.shape[0]) * 0.01 w = rnd.rand(X.shape[0]) * 0.01
g_rle = np.array([len(list(grp)) for _, grp in groupby(g)]) g_rle = np.array([len(list(grp)) for _, grp in groupby(g)])
if output == 'dataframe': if output.startswith('dataframe'):
# add target, weight, and group to DataFrame so that partitions abide by group boundaries. # add target, weight, and group to DataFrame so that partitions abide by group boundaries.
X_df = pd.DataFrame(X, columns=[f'feature_{i}' for i in range(X.shape[1])]) X_df = pd.DataFrame(X, columns=[f'feature_{i}' for i in range(X.shape[1])])
if output == 'dataframe-with-categorical':
for i in range(5):
col_name = "cat_col" + str(i)
cat_values = rnd.choice(['a', 'b'], X.shape[0])
cat_series = pd.Series(
cat_values,
dtype='category'
)
X_df[col_name] = cat_series
X = X_df.copy() X = X_df.copy()
X_df = X_df.assign(y=y, g=g, w=w) X_df = X_df.assign(y=y, g=g, w=w)
...@@ -115,8 +124,27 @@ def _create_data(objective, n_samples=100, centers=2, output='array', chunk_size ...@@ -115,8 +124,27 @@ def _create_data(objective, n_samples=100, centers=2, output='array', chunk_size
dX = da.from_array(X, (chunk_size, X.shape[1])) dX = da.from_array(X, (chunk_size, X.shape[1]))
dy = da.from_array(y, chunk_size) dy = da.from_array(y, chunk_size)
dw = da.from_array(weights, chunk_size) dw = da.from_array(weights, chunk_size)
elif output == 'dataframe': elif output.startswith('dataframe'):
X_df = pd.DataFrame(X, columns=['feature_%d' % i for i in range(X.shape[1])]) X_df = pd.DataFrame(X, columns=['feature_%d' % i for i in range(X.shape[1])])
if output == 'dataframe-with-categorical':
num_cat_cols = 5
for i in range(num_cat_cols):
col_name = "cat_col" + str(i)
cat_values = rnd.choice(['a', 'b'], X.shape[0])
cat_series = pd.Series(
cat_values,
dtype='category'
)
X_df[col_name] = cat_series
X = np.hstack((X, cat_series.cat.codes.values.reshape(-1, 1)))
# for the small data sizes used in tests, it's hard to get LGBMRegressor to choose
# categorical features for splits. So for regression tests with categorical features,
# _create_data() returns a DataFrame with ONLY categorical features
if objective == 'regression':
cat_cols = [col for col in X_df.columns if col.startswith('cat_col')]
X_df = X_df[cat_cols]
X = X[:, -num_cat_cols:]
y_df = pd.Series(y, name='target') y_df = pd.Series(y, name='target')
dX = dd.from_pandas(X_df, chunksize=chunk_size) dX = dd.from_pandas(X_df, chunksize=chunk_size)
dy = dd.from_pandas(y_df, chunksize=chunk_size) dy = dd.from_pandas(y_df, chunksize=chunk_size)
...@@ -180,6 +208,12 @@ def test_classifier(output, centers, client, listen_port): ...@@ -180,6 +208,12 @@ def test_classifier(output, centers, client, listen_port):
"n_estimators": 10, "n_estimators": 10,
"num_leaves": 10 "num_leaves": 10
} }
if output == 'dataframe-with-categorical':
params["categorical_feature"] = [
i for i, col in enumerate(dX.columns) if col.startswith('cat_')
]
dask_classifier = lgb.DaskLGBMClassifier( dask_classifier = lgb.DaskLGBMClassifier(
client=client, client=client,
time_out=5, time_out=5,
...@@ -207,6 +241,18 @@ def test_classifier(output, centers, client, listen_port): ...@@ -207,6 +241,18 @@ def test_classifier(output, centers, client, listen_port):
assert_eq(p1_local, p2) assert_eq(p1_local, p2)
assert_eq(y, p1_local) assert_eq(y, p1_local)
# be sure LightGBM actually used at least one categorical column,
# and that it was correctly treated as a categorical feature
if output == 'dataframe-with-categorical':
cat_cols = [
col for col in dX.columns
if dX.dtypes[col].name == 'category'
]
tree_df = dask_classifier.booster_.trees_to_dataframe()
node_uses_cat_col = tree_df['split_feature'].isin(cat_cols)
assert node_uses_cat_col.sum() > 0
assert tree_df.loc[node_uses_cat_col, "decision_type"].unique()[0] == '=='
client.close(timeout=CLIENT_CLOSE_TIMEOUT) client.close(timeout=CLIENT_CLOSE_TIMEOUT)
...@@ -223,6 +269,12 @@ def test_classifier_pred_contrib(output, centers, client, listen_port): ...@@ -223,6 +269,12 @@ def test_classifier_pred_contrib(output, centers, client, listen_port):
"n_estimators": 10, "n_estimators": 10,
"num_leaves": 10 "num_leaves": 10
} }
if output == 'dataframe-with-categorical':
params["categorical_feature"] = [
i for i, col in enumerate(dX.columns) if col.startswith('cat_')
]
dask_classifier = lgb.DaskLGBMClassifier( dask_classifier = lgb.DaskLGBMClassifier(
client=client, client=client,
time_out=5, time_out=5,
...@@ -240,6 +292,18 @@ def test_classifier_pred_contrib(output, centers, client, listen_port): ...@@ -240,6 +292,18 @@ def test_classifier_pred_contrib(output, centers, client, listen_port):
if output == 'scipy_csr_matrix': if output == 'scipy_csr_matrix':
preds_with_contrib = np.array(preds_with_contrib.todense()) preds_with_contrib = np.array(preds_with_contrib.todense())
# be sure LightGBM actually used at least one categorical column,
# and that it was correctly treated as a categorical feature
if output == 'dataframe-with-categorical':
cat_cols = [
col for col in dX.columns
if dX.dtypes[col].name == 'category'
]
tree_df = dask_classifier.booster_.trees_to_dataframe()
node_uses_cat_col = tree_df['split_feature'].isin(cat_cols)
assert node_uses_cat_col.sum() > 0
assert tree_df.loc[node_uses_cat_col, "decision_type"].unique()[0] == '=='
# shape depends on whether it is binary or multiclass classification # shape depends on whether it is binary or multiclass classification
num_features = dask_classifier.n_features_ num_features = dask_classifier.n_features_
num_classes = dask_classifier.n_classes_ num_classes = dask_classifier.n_classes_
...@@ -301,6 +365,12 @@ def test_regressor(output, client, listen_port): ...@@ -301,6 +365,12 @@ def test_regressor(output, client, listen_port):
"random_state": 42, "random_state": 42,
"num_leaves": 10 "num_leaves": 10
} }
if output == 'dataframe-with-categorical':
params["categorical_feature"] = [
i for i, col in enumerate(dX.columns) if col.startswith('cat_')
]
dask_regressor = lgb.DaskLGBMRegressor( dask_regressor = lgb.DaskLGBMRegressor(
client=client, client=client,
time_out=5, time_out=5,
...@@ -310,7 +380,7 @@ def test_regressor(output, client, listen_port): ...@@ -310,7 +380,7 @@ def test_regressor(output, client, listen_port):
) )
dask_regressor = dask_regressor.fit(dX, dy, sample_weight=dw) dask_regressor = dask_regressor.fit(dX, dy, sample_weight=dw)
p1 = dask_regressor.predict(dX) p1 = dask_regressor.predict(dX)
if output != 'dataframe': if not output.startswith('dataframe'):
s1 = _r2_score(dy, p1) s1 = _r2_score(dy, p1)
p1 = p1.compute() p1 = p1.compute()
p1_local = dask_regressor.to_local().predict(X) p1_local = dask_regressor.to_local().predict(X)
...@@ -322,15 +392,33 @@ def test_regressor(output, client, listen_port): ...@@ -322,15 +392,33 @@ def test_regressor(output, client, listen_port):
p2 = local_regressor.predict(X) p2 = local_regressor.predict(X)
# Scores should be the same # Scores should be the same
if output != 'dataframe': if not output.startswith('dataframe'):
assert_eq(s1, s2, atol=.01) assert_eq(s1, s2, atol=.01)
assert_eq(s1, s1_local, atol=.003) assert_eq(s1, s1_local, atol=.003)
# Predictions should be roughly the same # Predictions should be roughly the same.
assert_eq(y, p1, rtol=1., atol=100.)
assert_eq(y, p2, rtol=1., atol=50.)
assert_eq(p1, p1_local) assert_eq(p1, p1_local)
# The checks below are skipped
# for the categorical data case because it's difficult to get
# a good fit from just categoricals for a regression problem
# with small data
if output != 'dataframe-with-categorical':
assert_eq(y, p1, rtol=1., atol=100.)
assert_eq(y, p2, rtol=1., atol=50.)
# be sure LightGBM actually used at least one categorical column,
# and that it was correctly treated as a categorical feature
if output == 'dataframe-with-categorical':
cat_cols = [
col for col in dX.columns
if dX.dtypes[col].name == 'category'
]
tree_df = dask_regressor.booster_.trees_to_dataframe()
node_uses_cat_col = tree_df['split_feature'].isin(cat_cols)
assert node_uses_cat_col.sum() > 0
assert tree_df.loc[node_uses_cat_col, "decision_type"].unique()[0] == '=='
client.close(timeout=CLIENT_CLOSE_TIMEOUT) client.close(timeout=CLIENT_CLOSE_TIMEOUT)
...@@ -345,6 +433,12 @@ def test_regressor_pred_contrib(output, client, listen_port): ...@@ -345,6 +433,12 @@ def test_regressor_pred_contrib(output, client, listen_port):
"n_estimators": 10, "n_estimators": 10,
"num_leaves": 10 "num_leaves": 10
} }
if output == 'dataframe-with-categorical':
params["categorical_feature"] = [
i for i, col in enumerate(dX.columns) if col.startswith('cat_')
]
dask_regressor = lgb.DaskLGBMRegressor( dask_regressor = lgb.DaskLGBMRegressor(
client=client, client=client,
time_out=5, time_out=5,
...@@ -368,6 +462,18 @@ def test_regressor_pred_contrib(output, client, listen_port): ...@@ -368,6 +462,18 @@ def test_regressor_pred_contrib(output, client, listen_port):
assert preds_with_contrib.shape[1] == num_features + 1 assert preds_with_contrib.shape[1] == num_features + 1
assert preds_with_contrib.shape == local_preds_with_contrib.shape assert preds_with_contrib.shape == local_preds_with_contrib.shape
# be sure LightGBM actually used at least one categorical column,
# and that it was correctly treated as a categorical feature
if output == 'dataframe-with-categorical':
cat_cols = [
col for col in dX.columns
if dX.dtypes[col].name == 'category'
]
tree_df = dask_regressor.booster_.trees_to_dataframe()
node_uses_cat_col = tree_df['split_feature'].isin(cat_cols)
assert node_uses_cat_col.sum() > 0
assert tree_df.loc[node_uses_cat_col, "decision_type"].unique()[0] == '=='
client.close(timeout=CLIENT_CLOSE_TIMEOUT) client.close(timeout=CLIENT_CLOSE_TIMEOUT)
...@@ -386,6 +492,12 @@ def test_regressor_quantile(output, client, listen_port, alpha): ...@@ -386,6 +492,12 @@ def test_regressor_quantile(output, client, listen_port, alpha):
"n_estimators": 10, "n_estimators": 10,
"num_leaves": 10 "num_leaves": 10
} }
if output == 'dataframe-with-categorical':
params["categorical_feature"] = [
i for i, col in enumerate(dX.columns) if col.startswith('cat_')
]
dask_regressor = lgb.DaskLGBMRegressor( dask_regressor = lgb.DaskLGBMRegressor(
client=client, client=client,
local_listen_port=listen_port, local_listen_port=listen_port,
...@@ -405,17 +517,37 @@ def test_regressor_quantile(output, client, listen_port, alpha): ...@@ -405,17 +517,37 @@ def test_regressor_quantile(output, client, listen_port, alpha):
np.testing.assert_allclose(q1, alpha, atol=0.2) np.testing.assert_allclose(q1, alpha, atol=0.2)
np.testing.assert_allclose(q2, alpha, atol=0.2) np.testing.assert_allclose(q2, alpha, atol=0.2)
# be sure LightGBM actually used at least one categorical column,
# and that it was correctly treated as a categorical feature
if output == 'dataframe-with-categorical':
cat_cols = [
col for col in dX.columns
if dX.dtypes[col].name == 'category'
]
tree_df = dask_regressor.booster_.trees_to_dataframe()
node_uses_cat_col = tree_df['split_feature'].isin(cat_cols)
assert node_uses_cat_col.sum() > 0
assert tree_df.loc[node_uses_cat_col, "decision_type"].unique()[0] == '=='
client.close(timeout=CLIENT_CLOSE_TIMEOUT) client.close(timeout=CLIENT_CLOSE_TIMEOUT)
@pytest.mark.parametrize('output', ['array', 'dataframe']) @pytest.mark.parametrize('output', ['array', 'dataframe', 'dataframe-with-categorical'])
@pytest.mark.parametrize('group', [None, group_sizes]) @pytest.mark.parametrize('group', [None, group_sizes])
def test_ranker(output, client, listen_port, group): def test_ranker(output, client, listen_port, group):
X, y, w, g, dX, dy, dw, dg = _create_ranking_data( if output == 'dataframe-with-categorical':
output=output, X, y, w, g, dX, dy, dw, dg = _create_ranking_data(
group=group output=output,
) group=group,
n_features=1,
n_informative=1
)
else:
X, y, w, g, dX, dy, dw, dg = _create_ranking_data(
output=output,
group=group,
)
# rebalance small dask.array dataset for better performance. # rebalance small dask.array dataset for better performance.
if output == 'array': if output == 'array':
...@@ -434,6 +566,12 @@ def test_ranker(output, client, listen_port, group): ...@@ -434,6 +566,12 @@ def test_ranker(output, client, listen_port, group):
"num_leaves": 20, "num_leaves": 20,
"min_child_samples": 1 "min_child_samples": 1
} }
if output == 'dataframe-with-categorical':
params["categorical_feature"] = [
i for i, col in enumerate(dX.columns) if col.startswith('cat_')
]
dask_ranker = lgb.DaskLGBMRanker( dask_ranker = lgb.DaskLGBMRanker(
client=client, client=client,
time_out=5, time_out=5,
...@@ -457,6 +595,18 @@ def test_ranker(output, client, listen_port, group): ...@@ -457,6 +595,18 @@ def test_ranker(output, client, listen_port, group):
assert spearmanr(rnkvec_dask, rnkvec_local).correlation > 0.8 assert spearmanr(rnkvec_dask, rnkvec_local).correlation > 0.8
assert_eq(rnkvec_dask, rnkvec_dask_local) assert_eq(rnkvec_dask, rnkvec_dask_local)
# be sure LightGBM actually used at least one categorical column,
# and that it was correctly treated as a categorical feature
if output == 'dataframe-with-categorical':
cat_cols = [
col for col in dX.columns
if dX.dtypes[col].name == 'category'
]
tree_df = dask_ranker.booster_.trees_to_dataframe()
node_uses_cat_col = tree_df['split_feature'].isin(cat_cols)
assert node_uses_cat_col.sum() > 0
assert tree_df.loc[node_uses_cat_col, "decision_type"].unique()[0] == '=='
client.close(timeout=CLIENT_CLOSE_TIMEOUT) client.close(timeout=CLIENT_CLOSE_TIMEOUT)
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment