test_dask.py 8.13 KB
Newer Older
1
2
# coding: utf-8
import os
3
import socket
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
import sys

import pytest
if not sys.platform.startswith("linux"):
    pytest.skip("lightgbm.dask is currently supported in Linux environments", allow_module_level=True)

import dask.array as da
import dask.dataframe as dd
import numpy as np
import pandas as pd
import scipy.sparse
from dask.array.utils import assert_eq
from dask_ml.metrics import accuracy_score, r2_score
from distributed.utils_test import client, cluster_fixture, gen_cluster, loop
from sklearn.datasets import make_blobs, make_regression

import lightgbm
import lightgbm.dask as dlgbm

data_output = ['array', 'scipy_csr_matrix', 'dataframe']
data_centers = [[[-4, -4], [4, 4]], [[-4, -4], [4, 4], [-4, 4]]]

pytestmark = [
    pytest.mark.skipif(os.getenv("TASK", "") == "mpi", reason="Fails to run with MPI interface")
]


@pytest.fixture()
def listen_port():
    listen_port.port += 10
    return listen_port.port


listen_port.port = 13000


def _create_data(objective, n_samples=100, centers=2, output='array', chunk_size=50):
    if objective == 'classification':
        X, y = make_blobs(n_samples=n_samples, centers=centers, random_state=42)
    elif objective == 'regression':
        X, y = make_regression(n_samples=n_samples, random_state=42)
    else:
        raise ValueError(objective)
    rnd = np.random.RandomState(42)
    weights = rnd.random(X.shape[0]) * 0.01

    if output == 'array':
        dX = da.from_array(X, (chunk_size, X.shape[1]))
        dy = da.from_array(y, chunk_size)
        dw = da.from_array(weights, chunk_size)
    elif output == 'dataframe':
        X_df = pd.DataFrame(X, columns=['feature_%d' % i for i in range(X.shape[1])])
        y_df = pd.Series(y, name='target')
        dX = dd.from_pandas(X_df, chunksize=chunk_size)
        dy = dd.from_pandas(y_df, chunksize=chunk_size)
        dw = dd.from_array(weights, chunksize=chunk_size)
    elif output == 'scipy_csr_matrix':
        dX = da.from_array(X, chunks=(chunk_size, X.shape[1])).map_blocks(scipy.sparse.csr_matrix)
        dy = da.from_array(y, chunks=chunk_size)
        dw = da.from_array(weights, chunk_size)
    else:
        raise ValueError("Unknown output type %s" % output)

    return X, y, weights, dX, dy, dw


@pytest.mark.parametrize('output', data_output)
@pytest.mark.parametrize('centers', data_centers)
def test_classifier(output, centers, client, listen_port):
    X, y, w, dX, dy, dw = _create_data('classification', output=output, centers=centers)

James Lamb's avatar
James Lamb committed
75
76
77
78
79
80
    dask_classifier = dlgbm.DaskLGBMClassifier(
        time_out=5,
        local_listen_port=listen_port,
        n_estimators=10,
        num_leaves=10
    )
81
82
    dask_classifier = dask_classifier.fit(dX, dy, sample_weight=dw, client=client)
    p1 = dask_classifier.predict(dX)
James Lamb's avatar
James Lamb committed
83
    p1_proba = dask_classifier.predict_proba(dX).compute()
84
85
86
    s1 = accuracy_score(dy, p1)
    p1 = p1.compute()

James Lamb's avatar
James Lamb committed
87
    local_classifier = lightgbm.LGBMClassifier(n_estimators=10, num_leaves=10)
88
89
    local_classifier.fit(X, y, sample_weight=w)
    p2 = local_classifier.predict(X)
James Lamb's avatar
James Lamb committed
90
    p2_proba = local_classifier.predict_proba(X)
91
92
93
94
95
96
    s2 = local_classifier.score(X, y)

    assert_eq(s1, s2)
    assert_eq(p1, p2)
    assert_eq(y, p1)
    assert_eq(y, p2)
James Lamb's avatar
James Lamb committed
97
    assert_eq(p1_proba, p2_proba, atol=0.3)
98
99


100
101
102
103
104
105
106
107
def test_training_does_not_fail_on_port_conflicts(client):
    _, _, _, dX, dy, dw = _create_data('classification', output='array')

    with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
        s.bind(('127.0.0.1', 12400))

        dask_classifier = dlgbm.DaskLGBMClassifier(
            time_out=5,
James Lamb's avatar
James Lamb committed
108
109
110
            local_listen_port=12400,
            n_estimators=5,
            num_leaves=5
111
        )
112
        for _ in range(5):
113
114
115
116
117
118
119
120
121
            dask_classifier.fit(
                X=dX,
                y=dy,
                sample_weight=dw,
                client=client
            )
            assert dask_classifier.booster_


122
123
124
def test_classifier_local_predict(client, listen_port):
    X, y, w, dX, dy, dw = _create_data('classification', output='array')

James Lamb's avatar
James Lamb committed
125
126
127
128
129
130
    dask_classifier = dlgbm.DaskLGBMClassifier(
        time_out=5,
        local_listen_port=listen_port,
        n_estimators=10,
        num_leaves=10
    )
131
132
133
    dask_classifier = dask_classifier.fit(dX, dy, sample_weight=dw, client=client)
    p1 = dask_classifier.to_local().predict(dX)

James Lamb's avatar
James Lamb committed
134
    local_classifier = lightgbm.LGBMClassifier(n_estimators=10, num_leaves=10)
135
136
137
138
139
140
141
142
143
144
145
146
    local_classifier.fit(X, y, sample_weight=w)
    p2 = local_classifier.predict(X)

    assert_eq(p1, p2)
    assert_eq(y, p1)
    assert_eq(y, p2)


@pytest.mark.parametrize('output', data_output)
def test_regressor(output, client, listen_port):
    X, y, w, dX, dy, dw = _create_data('regression', output=output)

James Lamb's avatar
James Lamb committed
147
148
149
150
151
152
    dask_regressor = dlgbm.DaskLGBMRegressor(
        time_out=5,
        local_listen_port=listen_port,
        seed=42,
        num_leaves=10
    )
153
154
155
156
157
158
    dask_regressor = dask_regressor.fit(dX, dy, client=client, sample_weight=dw)
    p1 = dask_regressor.predict(dX)
    if output != 'dataframe':
        s1 = r2_score(dy, p1)
    p1 = p1.compute()

James Lamb's avatar
James Lamb committed
159
    local_regressor = lightgbm.LGBMRegressor(seed=42, num_leaves=10)
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
    local_regressor.fit(X, y, sample_weight=w)
    s2 = local_regressor.score(X, y)
    p2 = local_regressor.predict(X)

    # Scores should be the same
    if output != 'dataframe':
        assert_eq(s1, s2, atol=.01)

    # Predictions should be roughly the same
    assert_eq(y, p1, rtol=1., atol=100.)
    assert_eq(y, p2, rtol=1., atol=50.)


@pytest.mark.parametrize('output', data_output)
@pytest.mark.parametrize('alpha', [.1, .5, .9])
def test_regressor_quantile(output, client, listen_port, alpha):
    X, y, w, dX, dy, dw = _create_data('regression', output=output)

James Lamb's avatar
James Lamb committed
178
179
180
181
182
183
184
185
    dask_regressor = dlgbm.DaskLGBMRegressor(
        local_listen_port=listen_port,
        seed=42,
        objective='quantile',
        alpha=alpha,
        n_estimators=10,
        num_leaves=10
    )
186
187
188
189
    dask_regressor = dask_regressor.fit(dX, dy, client=client, sample_weight=dw)
    p1 = dask_regressor.predict(dX).compute()
    q1 = np.count_nonzero(y < p1) / y.shape[0]

James Lamb's avatar
James Lamb committed
190
191
192
193
194
195
196
    local_regressor = lightgbm.LGBMRegressor(
        seed=42,
        objective='quantile',
        alpha=alpha,
        n_estimatores=10,
        num_leaves=10
    )
197
198
199
200
201
202
203
204
205
206
    local_regressor.fit(X, y, sample_weight=w)
    p2 = local_regressor.predict(X)
    q2 = np.count_nonzero(y < p2) / y.shape[0]

    # Quantiles should be right
    np.testing.assert_allclose(q1, alpha, atol=0.2)
    np.testing.assert_allclose(q2, alpha, atol=0.2)


def test_regressor_local_predict(client, listen_port):
207
    X, y, _, dX, dy, dw = _create_data('regression', output='array')
208

James Lamb's avatar
James Lamb committed
209
210
211
212
213
214
    dask_regressor = dlgbm.DaskLGBMRegressor(
        local_listen_port=listen_port,
        seed=42,
        n_estimators=10,
        num_leaves=10
    )
215
216
217
218
219
220
221
222
223
224
225
226
    dask_regressor = dask_regressor.fit(dX, dy, sample_weight=dw, client=client)
    p1 = dask_regressor.predict(dX)
    p2 = dask_regressor.to_local().predict(X)
    s1 = r2_score(dy, p1)
    p1 = p1.compute()
    s2 = dask_regressor.to_local().score(X, y)

    # Predictions and scores should be the same
    assert_eq(p1, p2)
    assert_eq(s1, s2)


227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
def test_find_open_port_works():
    worker_ip = '127.0.0.1'
    with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
        s.bind((worker_ip, 12400))
        new_port = dlgbm._find_open_port(
            worker_ip=worker_ip,
            local_listen_port=12400,
            ports_to_skip=set()
        )
        assert new_port == 12401

    with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s_1:
        s_1.bind((worker_ip, 12400))
        with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s_2:
            s_2.bind((worker_ip, 12401))
            new_port = dlgbm._find_open_port(
                worker_ip=worker_ip,
                local_listen_port=12400,
                ports_to_skip=set()
            )
            assert new_port == 12402
248
249
250
251
252
253
254
255
256
257
258
259


@gen_cluster(client=True, timeout=None)
def test_errors(c, s, a, b):
    def f(part):
        raise Exception('foo')

    df = dd.demo.make_timeseries()
    df = df.map_partitions(f, meta=df._meta)
    with pytest.raises(Exception) as info:
        yield dlgbm._train(c, df, df.x, params={}, model_factory=lightgbm.LGBMClassifier)
        assert 'foo' in str(info.value)