Unverified Commit f21e0efc authored by Nikita Titov's avatar Nikita Titov Committed by GitHub
Browse files

[dask][tests] skip Dask tests when Dask is not installed and improve imports in Dask tests (#3852)

* Update test_dask.py

* Update test_dask.py
parent 5cdaf1bd
# coding: utf-8 # coding: utf-8
"""Tests for lightgbm.dask module""" """Tests for lightgbm.dask module"""
import itertools
import os
import socket import socket
import sys from itertools import groupby
from os import getenv
from sys import platform
import lightgbm as lgb
import pytest import pytest
if not sys.platform.startswith('linux'): if not platform.startswith('linux'):
pytest.skip('lightgbm.dask is currently supported in Linux environments', allow_module_level=True) pytest.skip('lightgbm.dask is currently supported in Linux environments', allow_module_level=True)
if not lgb.compat.DASK_INSTALLED:
pytest.skip('Dask is not installed', allow_module_level=True)
import dask.array as da import dask.array as da
import dask.dataframe as dd import dask.dataframe as dd
import numpy as np import numpy as np
import pandas as pd import pandas as pd
from scipy.stats import spearmanr from scipy.stats import spearmanr
import scipy.sparse
from dask.array.utils import assert_eq from dask.array.utils import assert_eq
from dask_ml.metrics import accuracy_score, r2_score from dask_ml.metrics import accuracy_score, r2_score
from distributed.utils_test import client, cluster_fixture, gen_cluster, loop from distributed.utils_test import client, cluster_fixture, gen_cluster, loop
from scipy.sparse import csr_matrix
from sklearn.datasets import make_blobs, make_regression from sklearn.datasets import make_blobs, make_regression
from sklearn.utils import check_random_state from sklearn.utils import check_random_state
import lightgbm
import lightgbm.dask as dlgbm
from .utils import make_ranking from .utils import make_ranking
...@@ -33,8 +33,8 @@ data_centers = [[[-4, -4], [4, 4]], [[-4, -4], [4, 4], [-4, 4]]] ...@@ -33,8 +33,8 @@ data_centers = [[[-4, -4], [4, 4]], [[-4, -4], [4, 4], [-4, 4]]]
group_sizes = [5, 5, 5, 10, 10, 10, 20, 20, 20, 50, 50] group_sizes = [5, 5, 5, 10, 10, 10, 20, 20, 20, 50, 50]
pytestmark = [ pytestmark = [
pytest.mark.skipif(os.getenv('TASK', '') == 'mpi', reason='Fails to run with MPI interface'), pytest.mark.skipif(getenv('TASK', '') == 'mpi', reason='Fails to run with MPI interface'),
pytest.mark.skipif(os.getenv('TASK', '') == 'gpu', reason='Fails to run with GPU interface') pytest.mark.skipif(getenv('TASK', '') == 'gpu', reason='Fails to run with GPU interface')
] ]
...@@ -51,7 +51,7 @@ def _create_ranking_data(n_samples=100, output='array', chunk_size=50, **kwargs) ...@@ -51,7 +51,7 @@ def _create_ranking_data(n_samples=100, output='array', chunk_size=50, **kwargs)
X, y, g = make_ranking(n_samples=n_samples, random_state=42, **kwargs) X, y, g = make_ranking(n_samples=n_samples, random_state=42, **kwargs)
rnd = np.random.RandomState(42) rnd = np.random.RandomState(42)
w = rnd.rand(X.shape[0]) * 0.01 w = rnd.rand(X.shape[0]) * 0.01
g_rle = np.array([len(list(grp)) for _, grp in itertools.groupby(g)]) g_rle = np.array([len(list(grp)) for _, grp in groupby(g)])
if output == 'dataframe': if output == 'dataframe':
# add target, weight, and group to DataFrame so that partitions abide by group boundaries. # add target, weight, and group to DataFrame so that partitions abide by group boundaries.
...@@ -115,7 +115,7 @@ def _create_data(objective, n_samples=100, centers=2, output='array', chunk_size ...@@ -115,7 +115,7 @@ def _create_data(objective, n_samples=100, centers=2, output='array', chunk_size
dy = dd.from_pandas(y_df, chunksize=chunk_size) dy = dd.from_pandas(y_df, chunksize=chunk_size)
dw = dd.from_array(weights, chunksize=chunk_size) dw = dd.from_array(weights, chunksize=chunk_size)
elif output == 'scipy_csr_matrix': elif output == 'scipy_csr_matrix':
dX = da.from_array(X, chunks=(chunk_size, X.shape[1])).map_blocks(scipy.sparse.csr_matrix) dX = da.from_array(X, chunks=(chunk_size, X.shape[1])).map_blocks(csr_matrix)
dy = da.from_array(y, chunks=chunk_size) dy = da.from_array(y, chunks=chunk_size)
dw = da.from_array(weights, chunk_size) dw = da.from_array(weights, chunk_size)
else: else:
...@@ -137,7 +137,7 @@ def test_classifier(output, centers, client, listen_port): ...@@ -137,7 +137,7 @@ def test_classifier(output, centers, client, listen_port):
"n_estimators": 10, "n_estimators": 10,
"num_leaves": 10 "num_leaves": 10
} }
dask_classifier = dlgbm.DaskLGBMClassifier( dask_classifier = lgb.DaskLGBMClassifier(
time_out=5, time_out=5,
local_listen_port=listen_port, local_listen_port=listen_port,
**params **params
...@@ -148,7 +148,7 @@ def test_classifier(output, centers, client, listen_port): ...@@ -148,7 +148,7 @@ def test_classifier(output, centers, client, listen_port):
s1 = accuracy_score(dy, p1) s1 = accuracy_score(dy, p1)
p1 = p1.compute() p1 = p1.compute()
local_classifier = lightgbm.LGBMClassifier(**params) local_classifier = lgb.LGBMClassifier(**params)
local_classifier.fit(X, y, sample_weight=w) local_classifier.fit(X, y, sample_weight=w)
p2 = local_classifier.predict(X) p2 = local_classifier.predict(X)
p2_proba = local_classifier.predict_proba(X) p2_proba = local_classifier.predict_proba(X)
...@@ -176,7 +176,7 @@ def test_classifier_pred_contrib(output, centers, client, listen_port): ...@@ -176,7 +176,7 @@ def test_classifier_pred_contrib(output, centers, client, listen_port):
"n_estimators": 10, "n_estimators": 10,
"num_leaves": 10 "num_leaves": 10
} }
dask_classifier = dlgbm.DaskLGBMClassifier( dask_classifier = lgb.DaskLGBMClassifier(
time_out=5, time_out=5,
local_listen_port=listen_port, local_listen_port=listen_port,
tree_learner='data', tree_learner='data',
...@@ -185,7 +185,7 @@ def test_classifier_pred_contrib(output, centers, client, listen_port): ...@@ -185,7 +185,7 @@ def test_classifier_pred_contrib(output, centers, client, listen_port):
dask_classifier = dask_classifier.fit(dX, dy, sample_weight=dw, client=client) dask_classifier = dask_classifier.fit(dX, dy, sample_weight=dw, client=client)
preds_with_contrib = dask_classifier.predict(dX, pred_contrib=True).compute() preds_with_contrib = dask_classifier.predict(dX, pred_contrib=True).compute()
local_classifier = lightgbm.LGBMClassifier(**params) local_classifier = lgb.LGBMClassifier(**params)
local_classifier.fit(X, y, sample_weight=w) local_classifier.fit(X, y, sample_weight=w)
local_preds_with_contrib = local_classifier.predict(X, pred_contrib=True) local_preds_with_contrib = local_classifier.predict(X, pred_contrib=True)
...@@ -222,7 +222,7 @@ def test_training_does_not_fail_on_port_conflicts(client): ...@@ -222,7 +222,7 @@ def test_training_does_not_fail_on_port_conflicts(client):
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s: with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
s.bind(('127.0.0.1', 12400)) s.bind(('127.0.0.1', 12400))
dask_classifier = dlgbm.DaskLGBMClassifier( dask_classifier = lgb.DaskLGBMClassifier(
time_out=5, time_out=5,
local_listen_port=12400, local_listen_port=12400,
n_estimators=5, n_estimators=5,
...@@ -250,7 +250,7 @@ def test_classifier_local_predict(client, listen_port): ...@@ -250,7 +250,7 @@ def test_classifier_local_predict(client, listen_port):
"n_estimators": 10, "n_estimators": 10,
"num_leaves": 10 "num_leaves": 10
} }
dask_classifier = dlgbm.DaskLGBMClassifier( dask_classifier = lgb.DaskLGBMClassifier(
time_out=5, time_out=5,
local_port=listen_port, local_port=listen_port,
**params **params
...@@ -258,7 +258,7 @@ def test_classifier_local_predict(client, listen_port): ...@@ -258,7 +258,7 @@ def test_classifier_local_predict(client, listen_port):
dask_classifier = dask_classifier.fit(dX, dy, sample_weight=dw, client=client) dask_classifier = dask_classifier.fit(dX, dy, sample_weight=dw, client=client)
p1 = dask_classifier.to_local().predict(dX) p1 = dask_classifier.to_local().predict(dX)
local_classifier = lightgbm.LGBMClassifier(**params) local_classifier = lgb.LGBMClassifier(**params)
local_classifier.fit(X, y, sample_weight=w) local_classifier.fit(X, y, sample_weight=w)
p2 = local_classifier.predict(X) p2 = local_classifier.predict(X)
...@@ -280,7 +280,7 @@ def test_regressor(output, client, listen_port): ...@@ -280,7 +280,7 @@ def test_regressor(output, client, listen_port):
"random_state": 42, "random_state": 42,
"num_leaves": 10 "num_leaves": 10
} }
dask_regressor = dlgbm.DaskLGBMRegressor( dask_regressor = lgb.DaskLGBMRegressor(
time_out=5, time_out=5,
local_listen_port=listen_port, local_listen_port=listen_port,
tree='data', tree='data',
...@@ -292,7 +292,7 @@ def test_regressor(output, client, listen_port): ...@@ -292,7 +292,7 @@ def test_regressor(output, client, listen_port):
s1 = r2_score(dy, p1) s1 = r2_score(dy, p1)
p1 = p1.compute() p1 = p1.compute()
local_regressor = lightgbm.LGBMRegressor(**params) local_regressor = lgb.LGBMRegressor(**params)
local_regressor.fit(X, y, sample_weight=w) local_regressor.fit(X, y, sample_weight=w)
s2 = local_regressor.score(X, y) s2 = local_regressor.score(X, y)
p2 = local_regressor.predict(X) p2 = local_regressor.predict(X)
...@@ -319,7 +319,7 @@ def test_regressor_pred_contrib(output, client, listen_port): ...@@ -319,7 +319,7 @@ def test_regressor_pred_contrib(output, client, listen_port):
"n_estimators": 10, "n_estimators": 10,
"num_leaves": 10 "num_leaves": 10
} }
dask_regressor = dlgbm.DaskLGBMRegressor( dask_regressor = lgb.DaskLGBMRegressor(
time_out=5, time_out=5,
local_listen_port=listen_port, local_listen_port=listen_port,
tree_learner='data', tree_learner='data',
...@@ -328,7 +328,7 @@ def test_regressor_pred_contrib(output, client, listen_port): ...@@ -328,7 +328,7 @@ def test_regressor_pred_contrib(output, client, listen_port):
dask_regressor = dask_regressor.fit(dX, dy, sample_weight=dw, client=client) dask_regressor = dask_regressor.fit(dX, dy, sample_weight=dw, client=client)
preds_with_contrib = dask_regressor.predict(dX, pred_contrib=True).compute() preds_with_contrib = dask_regressor.predict(dX, pred_contrib=True).compute()
local_regressor = lightgbm.LGBMRegressor(**params) local_regressor = lgb.LGBMRegressor(**params)
local_regressor.fit(X, y, sample_weight=w) local_regressor.fit(X, y, sample_weight=w)
local_preds_with_contrib = local_regressor.predict(X, pred_contrib=True) local_preds_with_contrib = local_regressor.predict(X, pred_contrib=True)
...@@ -357,7 +357,7 @@ def test_regressor_quantile(output, client, listen_port, alpha): ...@@ -357,7 +357,7 @@ def test_regressor_quantile(output, client, listen_port, alpha):
"n_estimators": 10, "n_estimators": 10,
"num_leaves": 10 "num_leaves": 10
} }
dask_regressor = dlgbm.DaskLGBMRegressor( dask_regressor = lgb.DaskLGBMRegressor(
local_listen_port=listen_port, local_listen_port=listen_port,
tree_learner_type='data_parallel', tree_learner_type='data_parallel',
**params **params
...@@ -366,7 +366,7 @@ def test_regressor_quantile(output, client, listen_port, alpha): ...@@ -366,7 +366,7 @@ def test_regressor_quantile(output, client, listen_port, alpha):
p1 = dask_regressor.predict(dX).compute() p1 = dask_regressor.predict(dX).compute()
q1 = np.count_nonzero(y < p1) / y.shape[0] q1 = np.count_nonzero(y < p1) / y.shape[0]
local_regressor = lightgbm.LGBMRegressor(**params) local_regressor = lgb.LGBMRegressor(**params)
local_regressor.fit(X, y, sample_weight=w) local_regressor.fit(X, y, sample_weight=w)
p2 = local_regressor.predict(X) p2 = local_regressor.predict(X)
q2 = np.count_nonzero(y < p2) / y.shape[0] q2 = np.count_nonzero(y < p2) / y.shape[0]
...@@ -381,7 +381,7 @@ def test_regressor_quantile(output, client, listen_port, alpha): ...@@ -381,7 +381,7 @@ def test_regressor_quantile(output, client, listen_port, alpha):
def test_regressor_local_predict(client, listen_port): def test_regressor_local_predict(client, listen_port):
X, y, _, dX, dy, dw = _create_data('regression', output='array') X, y, _, dX, dy, dw = _create_data('regression', output='array')
dask_regressor = dlgbm.DaskLGBMRegressor( dask_regressor = lgb.DaskLGBMRegressor(
local_listen_port=listen_port, local_listen_port=listen_port,
random_state=42, random_state=42,
n_estimators=10, n_estimators=10,
...@@ -419,7 +419,7 @@ def test_ranker(output, client, listen_port, group): ...@@ -419,7 +419,7 @@ def test_ranker(output, client, listen_port, group):
"num_leaves": 20, "num_leaves": 20,
"min_child_samples": 1 "min_child_samples": 1
} }
dask_ranker = dlgbm.DaskLGBMRanker( dask_ranker = lgb.DaskLGBMRanker(
time_out=5, time_out=5,
local_listen_port=listen_port, local_listen_port=listen_port,
tree_learner_type='data_parallel', tree_learner_type='data_parallel',
...@@ -429,7 +429,7 @@ def test_ranker(output, client, listen_port, group): ...@@ -429,7 +429,7 @@ def test_ranker(output, client, listen_port, group):
rnkvec_dask = dask_ranker.predict(dX) rnkvec_dask = dask_ranker.predict(dX)
rnkvec_dask = rnkvec_dask.compute() rnkvec_dask = rnkvec_dask.compute()
local_ranker = lightgbm.LGBMRanker(**params) local_ranker = lgb.LGBMRanker(**params)
local_ranker.fit(X, y, sample_weight=w, group=g) local_ranker.fit(X, y, sample_weight=w, group=g)
rnkvec_local = local_ranker.predict(X) rnkvec_local = local_ranker.predict(X)
...@@ -451,7 +451,7 @@ def test_ranker_local_predict(output, client, listen_port, group): ...@@ -451,7 +451,7 @@ def test_ranker_local_predict(output, client, listen_port, group):
group=group group=group
) )
dask_ranker = dlgbm.DaskLGBMRanker( dask_ranker = lgb.DaskLGBMRanker(
time_out=5, time_out=5,
local_listen_port=listen_port, local_listen_port=listen_port,
tree_learner='data', tree_learner='data',
...@@ -475,7 +475,7 @@ def test_find_open_port_works(): ...@@ -475,7 +475,7 @@ def test_find_open_port_works():
worker_ip = '127.0.0.1' worker_ip = '127.0.0.1'
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s: with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
s.bind((worker_ip, 12400)) s.bind((worker_ip, 12400))
new_port = dlgbm._find_open_port( new_port = lgb.dask._find_open_port(
worker_ip=worker_ip, worker_ip=worker_ip,
local_listen_port=12400, local_listen_port=12400,
ports_to_skip=set() ports_to_skip=set()
...@@ -486,7 +486,7 @@ def test_find_open_port_works(): ...@@ -486,7 +486,7 @@ def test_find_open_port_works():
s_1.bind((worker_ip, 12400)) s_1.bind((worker_ip, 12400))
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s_2: with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s_2:
s_2.bind((worker_ip, 12401)) s_2.bind((worker_ip, 12401))
new_port = dlgbm._find_open_port( new_port = lgb.dask._find_open_port(
worker_ip=worker_ip, worker_ip=worker_ip,
local_listen_port=12400, local_listen_port=12400,
ports_to_skip=set() ports_to_skip=set()
...@@ -502,11 +502,11 @@ def test_errors(c, s, a, b): ...@@ -502,11 +502,11 @@ def test_errors(c, s, a, b):
df = dd.demo.make_timeseries() df = dd.demo.make_timeseries()
df = df.map_partitions(f, meta=df._meta) df = df.map_partitions(f, meta=df._meta)
with pytest.raises(Exception) as info: with pytest.raises(Exception) as info:
yield dlgbm._train( yield lgb.dask._train(
client=c, client=c,
data=df, data=df,
label=df.x, label=df.x,
params={}, params={},
model_factory=lightgbm.LGBMClassifier model_factory=lgb.LGBMClassifier
) )
assert 'foo' in str(info.value) assert 'foo' in str(info.value)
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment