Unverified Commit 1d7b54d3 authored by jmoralez's avatar jmoralez Committed by GitHub
Browse files

[dask] include multiclass-classification task in tests (#4048)

* include multiclass-classification task and task_to_model_factory dicts

* define centers coordinates. flatten init_scores within each partition for multiclass-classification

* include issue comment and fix linting error
parent 13680d89
...@@ -43,10 +43,21 @@ sk_version = parse_version(sk_version) ...@@ -43,10 +43,21 @@ sk_version = parse_version(sk_version)
# see https://distributed.dask.org/en/latest/api.html#distributed.Client.close # see https://distributed.dask.org/en/latest/api.html#distributed.Client.close
CLIENT_CLOSE_TIMEOUT = 120 CLIENT_CLOSE_TIMEOUT = 120
tasks = ['classification', 'regression', 'ranking'] tasks = ['binary-classification', 'multiclass-classification', 'regression', 'ranking']
data_output = ['array', 'scipy_csr_matrix', 'dataframe', 'dataframe-with-categorical'] data_output = ['array', 'scipy_csr_matrix', 'dataframe', 'dataframe-with-categorical']
data_centers = [[[-4, -4], [4, 4]], [[-4, -4], [4, 4], [-4, 4]]]
group_sizes = [5, 5, 5, 10, 10, 10, 20, 20, 20, 50, 50] group_sizes = [5, 5, 5, 10, 10, 10, 20, 20, 20, 50, 50]
task_to_dask_factory = {
'regression': lgb.DaskLGBMRegressor,
'binary-classification': lgb.DaskLGBMClassifier,
'multiclass-classification': lgb.DaskLGBMClassifier,
'ranking': lgb.DaskLGBMRanker
}
task_to_local_factory = {
'regression': lgb.LGBMRegressor,
'binary-classification': lgb.LGBMClassifier,
'multiclass-classification': lgb.LGBMClassifier,
'ranking': lgb.LGBMRanker
}
pytestmark = [ pytestmark = [
pytest.mark.skipif(getenv('TASK', '') == 'mpi', reason='Fails to run with MPI interface'), pytest.mark.skipif(getenv('TASK', '') == 'mpi', reason='Fails to run with MPI interface'),
...@@ -120,8 +131,14 @@ def _create_ranking_data(n_samples=100, output='array', chunk_size=50, **kwargs) ...@@ -120,8 +131,14 @@ def _create_ranking_data(n_samples=100, output='array', chunk_size=50, **kwargs)
return X, y, w, g_rle, dX, dy, dw, dg return X, y, w, g_rle, dX, dy, dw, dg
def _create_data(objective, n_samples=100, centers=2, output='array', chunk_size=50): def _create_data(objective, n_samples=100, output='array', chunk_size=50):
if objective == 'classification': if objective.endswith('classification'):
if objective == 'binary-classification':
centers = [[-4, -4], [4, 4]]
elif objective == 'multiclass-classification':
centers = [[-4, -4], [4, 4], [-4, 4]]
else:
raise ValueError(f"Unknown classification task '{objective}'")
X, y = make_blobs(n_samples=n_samples, centers=centers, random_state=42) X, y = make_blobs(n_samples=n_samples, centers=centers, random_state=42)
elif objective == 'regression': elif objective == 'regression':
X, y = make_regression(n_samples=n_samples, random_state=42) X, y = make_regression(n_samples=n_samples, random_state=42)
...@@ -206,12 +223,11 @@ def _unpickle(filepath, serializer): ...@@ -206,12 +223,11 @@ def _unpickle(filepath, serializer):
@pytest.mark.parametrize('output', data_output) @pytest.mark.parametrize('output', data_output)
@pytest.mark.parametrize('centers', data_centers) @pytest.mark.parametrize('task', ['binary-classification', 'multiclass-classification'])
def test_classifier(output, centers, client): def test_classifier(output, task, client):
X, y, w, dX, dy, dw = _create_data( X, y, w, dX, dy, dw = _create_data(
objective='classification', objective=task,
output=output, output=output
centers=centers
) )
params = { params = {
...@@ -273,12 +289,11 @@ def test_classifier(output, centers, client): ...@@ -273,12 +289,11 @@ def test_classifier(output, centers, client):
@pytest.mark.parametrize('output', data_output) @pytest.mark.parametrize('output', data_output)
@pytest.mark.parametrize('centers', data_centers) @pytest.mark.parametrize('task', ['binary-classification', 'multiclass-classification'])
def test_classifier_pred_contrib(output, centers, client): def test_classifier_pred_contrib(output, task, client):
X, y, w, dX, dy, dw = _create_data( X, y, w, dX, dy, dw = _create_data(
objective='classification', objective=task,
output=output, output=output
centers=centers
) )
params = { params = {
...@@ -354,7 +369,7 @@ def test_find_random_open_port(client): ...@@ -354,7 +369,7 @@ def test_find_random_open_port(client):
def test_training_does_not_fail_on_port_conflicts(client): def test_training_does_not_fail_on_port_conflicts(client):
_, _, _, dX, dy, dw = _create_data('classification', output='array') _, _, _, dX, dy, dw = _create_data('binary-classification', output='array')
lightgbm_default_port = 12400 lightgbm_default_port = 12400
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s: with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
...@@ -640,17 +655,13 @@ def test_training_works_if_client_not_provided_or_set_after_construction(task, c ...@@ -640,17 +655,13 @@ def test_training_works_if_client_not_provided_or_set_after_construction(task, c
output='array', output='array',
group=None group=None
) )
model_factory = lgb.DaskLGBMRanker
else: else:
_, _, _, dX, dy, _ = _create_data( _, _, _, dX, dy, _ = _create_data(
objective=task, objective=task,
output='array', output='array',
) )
dg = None dg = None
if task == 'classification': model_factory = task_to_dask_factory[task]
model_factory = lgb.DaskLGBMClassifier
elif task == 'regression':
model_factory = lgb.DaskLGBMRegressor
params = { params = {
"time_out": 5, "time_out": 5,
...@@ -744,12 +755,7 @@ def test_model_and_local_version_are_picklable_whether_or_not_client_set_explici ...@@ -744,12 +755,7 @@ def test_model_and_local_version_are_picklable_whether_or_not_client_set_explici
) )
dg_2 = None dg_2 = None
if task == 'ranking': model_factory = task_to_dask_factory[task]
model_factory = lgb.DaskLGBMRanker
elif task == 'classification':
model_factory = lgb.DaskLGBMClassifier
elif task == 'regression':
model_factory = lgb.DaskLGBMRegressor
params = { params = {
"time_out": 5, "time_out": 5,
...@@ -970,8 +976,6 @@ def test_training_succeeds_even_if_some_workers_do_not_have_any_data(client, tas ...@@ -970,8 +976,6 @@ def test_training_succeeds_even_if_some_workers_do_not_have_any_data(client, tas
output=output, output=output,
group=None group=None
) )
dask_model_factory = lgb.DaskLGBMRanker
local_model_factory = lgb.LGBMRanker
else: else:
X, y, w, dX, dy, dw = _create_data( X, y, w, dX, dy, dw = _create_data(
objective=task, objective=task,
...@@ -979,12 +983,9 @@ def test_training_succeeds_even_if_some_workers_do_not_have_any_data(client, tas ...@@ -979,12 +983,9 @@ def test_training_succeeds_even_if_some_workers_do_not_have_any_data(client, tas
) )
g = None g = None
dg = None dg = None
if task == 'classification':
dask_model_factory = lgb.DaskLGBMClassifier dask_model_factory = task_to_dask_factory[task]
local_model_factory = lgb.LGBMClassifier local_model_factory = task_to_local_factory[task]
elif task == 'regression':
dask_model_factory = lgb.DaskLGBMRegressor
local_model_factory = lgb.LGBMRegressor
dX = collection_to_single_partition(dX) dX = collection_to_single_partition(dX)
dy = collection_to_single_partition(dy) dy = collection_to_single_partition(dy)
...@@ -1029,7 +1030,6 @@ def test_network_params_not_required_but_respected_if_given(client, task, output ...@@ -1029,7 +1030,6 @@ def test_network_params_not_required_but_respected_if_given(client, task, output
group=None, group=None,
chunk_size=10, chunk_size=10,
) )
dask_model_factory = lgb.DaskLGBMRanker
else: else:
_, _, _, dX, dy, _ = _create_data( _, _, _, dX, dy, _ = _create_data(
objective=task, objective=task,
...@@ -1037,10 +1037,8 @@ def test_network_params_not_required_but_respected_if_given(client, task, output ...@@ -1037,10 +1037,8 @@ def test_network_params_not_required_but_respected_if_given(client, task, output
chunk_size=10, chunk_size=10,
) )
dg = None dg = None
if task == 'classification':
dask_model_factory = lgb.DaskLGBMClassifier dask_model_factory = task_to_dask_factory[task]
elif task == 'regression':
dask_model_factory = lgb.DaskLGBMRegressor
# rebalance data to be sure that each worker has a piece of the data # rebalance data to be sure that each worker has a piece of the data
if output == 'array': if output == 'array':
...@@ -1103,7 +1101,6 @@ def test_machines_should_be_used_if_provided(task, output): ...@@ -1103,7 +1101,6 @@ def test_machines_should_be_used_if_provided(task, output):
group=None, group=None,
chunk_size=10, chunk_size=10,
) )
dask_model_factory = lgb.DaskLGBMRanker
else: else:
_, _, _, dX, dy, _ = _create_data( _, _, _, dX, dy, _ = _create_data(
objective=task, objective=task,
...@@ -1111,10 +1108,8 @@ def test_machines_should_be_used_if_provided(task, output): ...@@ -1111,10 +1108,8 @@ def test_machines_should_be_used_if_provided(task, output):
chunk_size=10, chunk_size=10,
) )
dg = None dg = None
if task == 'classification':
dask_model_factory = lgb.DaskLGBMClassifier dask_model_factory = task_to_dask_factory[task]
elif task == 'regression':
dask_model_factory = lgb.DaskLGBMRegressor
# rebalance data to be sure that each worker has a piece of the data # rebalance data to be sure that each worker has a piece of the data
if output == 'array': if output == 'array':
...@@ -1201,17 +1196,15 @@ def test_training_succeeds_when_data_is_dataframe_and_label_is_column_array( ...@@ -1201,17 +1196,15 @@ def test_training_succeeds_when_data_is_dataframe_and_label_is_column_array(
output='dataframe', output='dataframe',
group=None group=None
) )
model_factory = lgb.DaskLGBMRanker
else: else:
_, _, _, dX, dy, dw = _create_data( _, _, _, dX, dy, dw = _create_data(
objective=task, objective=task,
output='dataframe', output='dataframe',
) )
dg = None dg = None
if task == 'classification':
model_factory = lgb.DaskLGBMClassifier model_factory = task_to_dask_factory[task]
elif task == 'regression':
model_factory = lgb.DaskLGBMRegressor
dy = dy.to_dask_array(lengths=True) dy = dy.to_dask_array(lengths=True)
dy_col_array = dy.reshape(-1, 1) dy_col_array = dy.reshape(-1, 1)
assert len(dy_col_array.shape) == 2 and dy_col_array.shape[1] == 1 assert len(dy_col_array.shape) == 2 and dy_col_array.shape[1] == 1
...@@ -1231,10 +1224,7 @@ def test_training_succeeds_when_data_is_dataframe_and_label_is_column_array( ...@@ -1231,10 +1224,7 @@ def test_training_succeeds_when_data_is_dataframe_and_label_is_column_array(
@pytest.mark.parametrize('task', tasks) @pytest.mark.parametrize('task', tasks)
@pytest.mark.parametrize('output', data_output) @pytest.mark.parametrize('output', data_output)
def test_init_score( def test_init_score(task, output, client):
task,
output,
client):
if task == 'ranking' and output == 'scipy_csr_matrix': if task == 'ranking' and output == 'scipy_csr_matrix':
pytest.skip('LGBMRanker is not currently tested on sparse matrices') pytest.skip('LGBMRanker is not currently tested on sparse matrices')
...@@ -1243,17 +1233,14 @@ def test_init_score( ...@@ -1243,17 +1233,14 @@ def test_init_score(
output=output, output=output,
group=None group=None
) )
model_factory = lgb.DaskLGBMRanker
else: else:
_, _, _, dX, dy, dw = _create_data( _, _, _, dX, dy, dw = _create_data(
objective=task, objective=task,
output=output, output=output,
) )
dg = None dg = None
if task == 'classification':
model_factory = lgb.DaskLGBMClassifier model_factory = task_to_dask_factory[task]
elif task == 'regression':
model_factory = lgb.DaskLGBMRegressor
params = { params = {
'n_estimators': 1, 'n_estimators': 1,
...@@ -1261,10 +1248,17 @@ def test_init_score( ...@@ -1261,10 +1248,17 @@ def test_init_score(
'time_out': 5 'time_out': 5
} }
init_score = random.random() init_score = random.random()
# init_scores must be a 1D array, even for multiclass classification
# where you need to provide 1 score per class for each row in X
# https://github.com/microsoft/LightGBM/issues/4046
size_factor = 1
if task == 'multiclass-classification':
size_factor = 3 # number of classes
if output.startswith('dataframe'): if output.startswith('dataframe'):
init_scores = dy.map_partitions(lambda x: pd.Series([init_score] * x.size)) init_scores = dy.map_partitions(lambda x: pd.Series([init_score] * x.size * size_factor))
else: else:
init_scores = da.full_like(dy, fill_value=init_score, dtype=np.float64) init_scores = dy.map_blocks(lambda x: np.repeat(init_score, x.size * size_factor))
model = model_factory(client=client, **params) model = model_factory(client=client, **params)
model.fit(dX, dy, sample_weight=dw, init_score=init_scores, group=dg) model.fit(dX, dy, sample_weight=dw, init_score=init_scores, group=dg)
# value of the root node is 0 when init_score is set # value of the root node is 0 when init_score is set
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment