test_consistency.py 5.79 KB
Newer Older
1
# coding: utf-8
2
from pathlib import Path
3
4
5
6

import numpy as np
from sklearn.datasets import load_svmlight_file

7
8
import lightgbm as lgb

9
EXAMPLES_DIR = Path(__file__).absolute().parents[2] / "examples"
10

11

12
class FileLoader:
13
    def __init__(self, directory, prefix, config_file="train.conf"):
14
15
        self.directory = directory
        self.prefix = prefix
16
17
        self.params = {"gpu_use_dp": True}
        with open(self.directory / config_file, "r") as f:
18
19
            for line in f.readlines():
                line = line.strip()
20
21
22
23
                if line and not line.startswith("#"):
                    key, value = [token.strip() for token in line.split("=")]
                    if "early_stopping" not in key:  # disable early_stopping
                        self.params[key] = value if key not in {"num_trees", "num_threads"} else int(value)
24
25

    def load_dataset(self, suffix, is_sparse=False):
26
        filename = str(self.path(suffix))
27
28
29
30
31
32
33
34
        if is_sparse:
            X, Y = load_svmlight_file(filename, dtype=np.float64, zero_based=True)
            return X, Y, filename
        else:
            mat = np.loadtxt(filename, dtype=np.float64)
            return mat[:, 1:], mat[:, 0], filename

    def load_field(self, suffix):
35
        return np.loadtxt(str(self.directory / f"{self.prefix}{suffix}"))
36

37
    def load_cpp_result(self, result_file="LightGBM_predict_result.txt"):
38
        return np.loadtxt(str(self.directory / result_file))
39
40

    def train_predict_check(self, lgb_train, X_test, X_test_fn, sk_pred):
41
        params = dict(self.params)
42
        params["force_row_wise"] = True
43
        gbm = lgb.train(params, lgb_train)
44
45
        y_pred = gbm.predict(X_test)
        cpp_pred = gbm.predict(X_test_fn)
46
47
        np.testing.assert_allclose(y_pred, cpp_pred)
        np.testing.assert_allclose(y_pred, sk_pred)
48

49
50
    def file_load_check(self, lgb_train, name):
        lgb_train_f = lgb.Dataset(self.path(name), params=self.params).construct()
51
        for f in ("num_data", "num_feature", "get_label", "get_weight", "get_init_score", "get_group"):
52
53
54
55
56
57
58
            a = getattr(lgb_train, f)()
            b = getattr(lgb_train_f, f)()
            if a is None and b is None:
                pass
            elif a is None:
                assert np.all(b == 1), f
            elif isinstance(b, (list, np.ndarray)):
59
                np.testing.assert_allclose(a, b)
60
61
62
63
            else:
                assert a == b, f

    def path(self, suffix):
64
        return self.directory / f"{self.prefix}{suffix}"
65

66

67
def test_binary():
68
69
70
71
    fd = FileLoader(EXAMPLES_DIR / "binary_classification", "binary")
    X_train, y_train, _ = fd.load_dataset(".train")
    X_test, _, X_test_fn = fd.load_dataset(".test")
    weight_train = fd.load_field(".train.weight")
72
73
74
75
76
    lgb_train = lgb.Dataset(X_train, y_train, params=fd.params, weight=weight_train)
    gbm = lgb.LGBMClassifier(**fd.params)
    gbm.fit(X_train, y_train, sample_weight=weight_train)
    sk_pred = gbm.predict_proba(X_test)[:, 1]
    fd.train_predict_check(lgb_train, X_test, X_test_fn, sk_pred)
77
    fd.file_load_check(lgb_train, ".train")
78
79
80


def test_binary_linear():
81
82
83
84
    fd = FileLoader(EXAMPLES_DIR / "binary_classification", "binary", "train_linear.conf")
    X_train, y_train, _ = fd.load_dataset(".train")
    X_test, _, X_test_fn = fd.load_dataset(".test")
    weight_train = fd.load_field(".train.weight")
85
    lgb_train = lgb.Dataset(X_train, y_train, params=fd.params, weight=weight_train)
86
    gbm = lgb.LGBMClassifier(**fd.params)
87
88
89
    gbm.fit(X_train, y_train, sample_weight=weight_train)
    sk_pred = gbm.predict_proba(X_test)[:, 1]
    fd.train_predict_check(lgb_train, X_test, X_test_fn, sk_pred)
90
    fd.file_load_check(lgb_train, ".train")
91
92
93


def test_multiclass():
94
95
96
    fd = FileLoader(EXAMPLES_DIR / "multiclass_classification", "multiclass")
    X_train, y_train, _ = fd.load_dataset(".train")
    X_test, _, X_test_fn = fd.load_dataset(".test")
97
98
99
100
101
    lgb_train = lgb.Dataset(X_train, y_train)
    gbm = lgb.LGBMClassifier(**fd.params)
    gbm.fit(X_train, y_train)
    sk_pred = gbm.predict_proba(X_test)
    fd.train_predict_check(lgb_train, X_test, X_test_fn, sk_pred)
102
    fd.file_load_check(lgb_train, ".train")
103
104
105


def test_regression():
106
107
108
109
    fd = FileLoader(EXAMPLES_DIR / "regression", "regression")
    X_train, y_train, _ = fd.load_dataset(".train")
    X_test, _, X_test_fn = fd.load_dataset(".test")
    init_score_train = fd.load_field(".train.init")
110
111
112
113
114
    lgb_train = lgb.Dataset(X_train, y_train, init_score=init_score_train)
    gbm = lgb.LGBMRegressor(**fd.params)
    gbm.fit(X_train, y_train, init_score=init_score_train)
    sk_pred = gbm.predict(X_test)
    fd.train_predict_check(lgb_train, X_test, X_test_fn, sk_pred)
115
    fd.file_load_check(lgb_train, ".train")
116
117
118


def test_lambdarank():
119
120
121
122
    fd = FileLoader(EXAMPLES_DIR / "lambdarank", "rank")
    X_train, y_train, _ = fd.load_dataset(".train", is_sparse=True)
    X_test, _, X_test_fn = fd.load_dataset(".test", is_sparse=True)
    group_train = fd.load_field(".train.query")
123
124
    lgb_train = lgb.Dataset(X_train, y_train, group=group_train)
    params = dict(fd.params)
125
    params["force_col_wise"] = True
126
127
128
129
    gbm = lgb.LGBMRanker(**params)
    gbm.fit(X_train, y_train, group=group_train)
    sk_pred = gbm.predict(X_test)
    fd.train_predict_check(lgb_train, X_test, X_test_fn, sk_pred)
130
    fd.file_load_check(lgb_train, ".train")
131
132
133


def test_xendcg():
134
135
136
137
    fd = FileLoader(EXAMPLES_DIR / "xendcg", "rank")
    X_train, y_train, _ = fd.load_dataset(".train", is_sparse=True)
    X_test, _, X_test_fn = fd.load_dataset(".test", is_sparse=True)
    group_train = fd.load_field(".train.query")
138
139
140
141
142
    lgb_train = lgb.Dataset(X_train, y_train, group=group_train)
    gbm = lgb.LGBMRanker(**fd.params)
    gbm.fit(X_train, y_train, group=group_train)
    sk_pred = gbm.predict(X_test)
    fd.train_predict_check(lgb_train, X_test, X_test_fn, sk_pred)
143
    fd.file_load_check(lgb_train, ".train")