_test_distributed.py 7.38 KB
Newer Older
1
2
3
4
5
import copy
import io
import socket
import subprocess
from concurrent.futures import ThreadPoolExecutor
6
from pathlib import Path
7
8
9
10
11
12
13
from typing import Any, Dict, Generator, List

import numpy as np
import pytest
from sklearn.datasets import make_blobs, make_regression
from sklearn.metrics import accuracy_score

14
TESTS_DIR = Path(__file__).absolute().parent
15
16


17
@pytest.fixture(scope="module")
18
19
def executable(pytestconfig) -> str:
    """Returns the path to the lightgbm executable."""
20
    return pytestconfig.getoption("execfile")
21
22
23
24
25


def _find_random_open_port() -> int:
    """Find a random open port on localhost."""
    with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
26
        s.bind(("", 0))
27
        port = s.getsockname()[1]
28
    return port  # noqa: RET504
29
30
31
32
33
34
35
36


def _generate_n_ports(n: int) -> Generator[int, None, None]:
    return (_find_random_open_port() for _ in range(n))


def _write_dict(d: Dict, file: io.TextIOWrapper) -> None:
    for k, v in d.items():
37
        file.write(f"{k} = {v}\n")
38
39
40
41
42
43
44


def create_data(task: str, n_samples: int = 1_000) -> np.ndarray:
    """Create the appropriate data for the task.

    The data is returned as a numpy array with the label as the first column.
    """
45
    if task == "binary-classification":
46
47
        centers = [[-4, -4], [4, 4]]
        X, y = make_blobs(n_samples, centers=centers, random_state=42)
48
    elif task == "regression":
49
        X, y = make_regression(n_samples, n_features=4, n_informative=2, random_state=42)
50
    return np.hstack([y.reshape(-1, 1), X])
51
52
53
54
55
56


class DistributedMockup:
    """Simulate distributed training."""

    default_train_config = {
57
58
59
60
61
62
63
64
65
        "task": "train",
        "pre_partition": True,
        "machine_list_file": TESTS_DIR / "mlist.txt",
        "tree_learner": "data",
        "force_row_wise": True,
        "verbose": 0,
        "num_boost_round": 20,
        "num_leaves": 15,
        "num_threads": 2,
66
67
68
    }

    default_predict_config = {
69
70
71
72
        "task": "predict",
        "data": TESTS_DIR / "train.txt",
        "input_model": TESTS_DIR / "model0.txt",
        "output_result": TESTS_DIR / "predictions.txt",
73
74
75
76
77
78
79
    }

    def __init__(self, executable: str):
        self.executable = executable

    def worker_train(self, i: int) -> subprocess.CompletedProcess:
        """Start the training process on the `i`-th worker."""
80
81
        config_path = TESTS_DIR / f"train{i}.conf"
        cmd = [self.executable, f"config={config_path}"]
82
        return subprocess.run(cmd, check=True)
83
84
85
86
87
88
89
90
91
92
93
94

    def _set_ports(self) -> None:
        """Randomly assign a port for training to each worker and save all ports to mlist.txt."""
        ports = set(_generate_n_ports(self.n_workers))
        i = 0
        max_tries = 100
        while i < max_tries and len(ports) < self.n_workers:
            n_ports_left = self.n_workers - len(ports)
            candidates = _generate_n_ports(n_ports_left)
            ports.update(candidates)
            i += 1
        if i == max_tries:
95
            raise RuntimeError("Unable to find non-colliding ports.")
96
        self.listen_ports = list(ports)
97
        with open(TESTS_DIR / "mlist.txt", "wt") as f:
98
            for port in self.listen_ports:
99
                f.write(f"127.0.0.1 {port}\n")
100
101
102
103

    def _write_data(self, partitions: List[np.ndarray]) -> None:
        """Write all training data as train.txt and each training partition as train{i}.txt."""
        all_data = np.vstack(partitions)
104
        np.savetxt(str(TESTS_DIR / "train.txt"), all_data, delimiter=",")
105
        for i, partition in enumerate(partitions):
106
            np.savetxt(str(TESTS_DIR / f"train{i}.txt"), partition, delimiter=",")
107

108
    def fit(self, partitions: List[np.ndarray], train_config: Dict) -> None:
109
110
111
112
113
114
115
116
117
118
119
120
        """Run the distributed training process on a single machine.

        For each worker i:
            1. The i-th partition is saved as train{i}.txt.
            2. A random port is assigned for training.
            3. A configuration file train{i}.conf is created.
            4. The lightgbm binary is called with config=train{i}.conf in another thread.
            5. The trained model is saved as model{i}.txt. Each model file only differs in data and local_listen_port.
        The whole training set is saved as train.txt.
        """
        self.train_config = copy.deepcopy(self.default_train_config)
        self.train_config.update(train_config)
121
        self.n_workers = self.train_config["num_machines"]
122
123
124
125
126
127
128
129
130
131
132
133
        self._set_ports()
        self._write_data(partitions)
        self.label_ = np.hstack([partition[:, 0] for partition in partitions])
        futures = []
        with ThreadPoolExecutor(max_workers=self.n_workers) as executor:
            for i in range(self.n_workers):
                self.write_train_config(i)
                train_future = executor.submit(self.worker_train, i)
                futures.append(train_future)
            results = [f.result() for f in futures]
        for result in results:
            if result.returncode != 0:
134
                raise RuntimeError("Error in training")
135

136
    def predict(self, predict_config: Dict[str, Any]) -> np.ndarray:
137
138
139
140
141
142
143
        """Compute the predictions using the model created in the fit step.

        predict_config is used to predict the training set train.txt
        The predictions are saved as predictions.txt and are then loaded to return them as a numpy array.
        """
        self.predict_config = copy.deepcopy(self.default_predict_config)
        self.predict_config.update(predict_config)
144
145
        config_path = TESTS_DIR / "predict.conf"
        with open(config_path, "wt") as file:
146
            _write_dict(self.predict_config, file)
147
        cmd = [self.executable, f"config={config_path}"]
148
        result = subprocess.run(cmd, check=True)
149
        if result.returncode != 0:
150
151
            raise RuntimeError("Error in prediction")
        return np.loadtxt(str(TESTS_DIR / "predictions.txt"))
152
153
154
155
156
157
158

    def write_train_config(self, i: int) -> None:
        """Create a file train{i}.conf with the required configuration to train.

        Each worker gets a different port and piece of the data, the rest are the
        model parameters contained in `self.config`.
        """
159
160
161
162
163
164
        with open(TESTS_DIR / f"train{i}.conf", "wt") as file:
            output_model = TESTS_DIR / f"model{i}.txt"
            data = TESTS_DIR / f"train{i}.txt"
            file.write(f"output_model = {output_model}\n")
            file.write(f"local_listen_port = {self.listen_ports[i]}\n")
            file.write(f"data = {data}\n")
165
166
167
168
169
170
            _write_dict(self.train_config, file)


def test_classifier(executable):
    """Test the classification task."""
    num_machines = 2
171
    data = create_data(task="binary-classification")
172
173
    partitions = np.array_split(data, num_machines)
    train_params = {
174
175
        "objective": "binary",
        "num_machines": num_machines,
176
177
178
    }
    clf = DistributedMockup(executable)
    clf.fit(partitions, train_params)
179
    y_probas = clf.predict(predict_config={})
180
    y_pred = y_probas > 0.5
181
    assert accuracy_score(clf.label_, y_pred) == 1.0
182
183
184
185
186


def test_regressor(executable):
    """Test the regression task."""
    num_machines = 2
187
    data = create_data(task="regression")
188
189
    partitions = np.array_split(data, num_machines)
    train_params = {
190
191
        "objective": "regression",
        "num_machines": num_machines,
192
193
194
    }
    reg = DistributedMockup(executable)
    reg.fit(partitions, train_params)
195
    y_pred = reg.predict(predict_config={})
196
    np.testing.assert_allclose(y_pred, reg.label_, rtol=0.2, atol=50.0)