Unverified Commit ec1debce authored by Nikita Titov's avatar Nikita Titov Committed by GitHub
Browse files

[python] migrate to pathlib in distributed tests (#4443)

parent 7eac5a63
import copy import copy
import io import io
import os
import socket import socket
import subprocess import subprocess
from concurrent.futures import ThreadPoolExecutor from concurrent.futures import ThreadPoolExecutor
from pathlib import Path
from typing import Any, Dict, Generator, List from typing import Any, Dict, Generator, List
import numpy as np import numpy as np
...@@ -11,7 +11,7 @@ import pytest ...@@ -11,7 +11,7 @@ import pytest
from sklearn.datasets import make_blobs, make_regression from sklearn.datasets import make_blobs, make_regression
from sklearn.metrics import accuracy_score from sklearn.metrics import accuracy_score
TESTS_DIR = os.path.abspath(os.path.dirname(__file__)) TESTS_DIR = Path(__file__).absolute().parent
@pytest.fixture(scope='module') @pytest.fixture(scope='module')
...@@ -57,7 +57,7 @@ class DistributedMockup: ...@@ -57,7 +57,7 @@ class DistributedMockup:
default_train_config = { default_train_config = {
'task': 'train', 'task': 'train',
'pre_partition': True, 'pre_partition': True,
'machine_list_file': os.path.join(TESTS_DIR, 'mlist.txt'), 'machine_list_file': TESTS_DIR / 'mlist.txt',
'tree_learner': 'data', 'tree_learner': 'data',
'force_row_wise': True, 'force_row_wise': True,
'verbose': 0, 'verbose': 0,
...@@ -68,9 +68,9 @@ class DistributedMockup: ...@@ -68,9 +68,9 @@ class DistributedMockup:
default_predict_config = { default_predict_config = {
'task': 'predict', 'task': 'predict',
'data': os.path.join(TESTS_DIR, 'train.txt'), 'data': TESTS_DIR / 'train.txt',
'input_model': os.path.join(TESTS_DIR, 'model0.txt'), 'input_model': TESTS_DIR / 'model0.txt',
'output_result': os.path.join(TESTS_DIR, 'predictions.txt'), 'output_result': TESTS_DIR / 'predictions.txt',
} }
def __init__(self, executable: str): def __init__(self, executable: str):
...@@ -78,7 +78,7 @@ class DistributedMockup: ...@@ -78,7 +78,7 @@ class DistributedMockup:
def worker_train(self, i: int) -> subprocess.CompletedProcess: def worker_train(self, i: int) -> subprocess.CompletedProcess:
"""Start the training process on the `i`-th worker.""" """Start the training process on the `i`-th worker."""
config_path = os.path.join(TESTS_DIR, f'train{i}.conf') config_path = TESTS_DIR / f'train{i}.conf'
cmd = [self.executable, f'config={config_path}'] cmd = [self.executable, f'config={config_path}']
return subprocess.run(cmd) return subprocess.run(cmd)
...@@ -95,16 +95,16 @@ class DistributedMockup: ...@@ -95,16 +95,16 @@ class DistributedMockup:
if i == max_tries: if i == max_tries:
raise RuntimeError('Unable to find non-colliding ports.') raise RuntimeError('Unable to find non-colliding ports.')
self.listen_ports = list(ports) self.listen_ports = list(ports)
with open(os.path.join(TESTS_DIR, 'mlist.txt'), 'wt') as f: with open(TESTS_DIR / 'mlist.txt', 'wt') as f:
for port in self.listen_ports: for port in self.listen_ports:
f.write(f'127.0.0.1 {port}\n') f.write(f'127.0.0.1 {port}\n')
def _write_data(self, partitions: List[np.ndarray]) -> None: def _write_data(self, partitions: List[np.ndarray]) -> None:
"""Write all training data as train.txt and each training partition as train{i}.txt.""" """Write all training data as train.txt and each training partition as train{i}.txt."""
all_data = np.vstack(partitions) all_data = np.vstack(partitions)
np.savetxt(os.path.join(TESTS_DIR, 'train.txt'), all_data, delimiter=',') np.savetxt(str(TESTS_DIR / 'train.txt'), all_data, delimiter=',')
for i, partition in enumerate(partitions): for i, partition in enumerate(partitions):
np.savetxt(os.path.join(TESTS_DIR, f'train{i}.txt'), partition, delimiter=',') np.savetxt(str(TESTS_DIR / f'train{i}.txt'), partition, delimiter=',')
def fit(self, partitions: List[np.ndarray], train_config: Dict = {}) -> None: def fit(self, partitions: List[np.ndarray], train_config: Dict = {}) -> None:
"""Run the distributed training process on a single machine. """Run the distributed training process on a single machine.
...@@ -142,14 +142,14 @@ class DistributedMockup: ...@@ -142,14 +142,14 @@ class DistributedMockup:
""" """
self.predict_config = copy.deepcopy(self.default_predict_config) self.predict_config = copy.deepcopy(self.default_predict_config)
self.predict_config.update(predict_config) self.predict_config.update(predict_config)
config_path = os.path.join(TESTS_DIR, 'predict.conf') config_path = TESTS_DIR / 'predict.conf'
with open(config_path, 'wt') as file: with open(config_path, 'wt') as file:
_write_dict(self.predict_config, file) _write_dict(self.predict_config, file)
cmd = [self.executable, f'config={config_path}'] cmd = [self.executable, f'config={config_path}']
result = subprocess.run(cmd) result = subprocess.run(cmd)
if result.returncode != 0: if result.returncode != 0:
raise RuntimeError raise RuntimeError
y_pred = np.loadtxt(os.path.join(TESTS_DIR, 'predictions.txt')) y_pred = np.loadtxt(str(TESTS_DIR / 'predictions.txt'))
return y_pred return y_pred
def write_train_config(self, i: int) -> None: def write_train_config(self, i: int) -> None:
...@@ -158,9 +158,9 @@ class DistributedMockup: ...@@ -158,9 +158,9 @@ class DistributedMockup:
Each worker gets a different port and piece of the data, the rest are the Each worker gets a different port and piece of the data, the rest are the
model parameters contained in `self.config`. model parameters contained in `self.config`.
""" """
with open(os.path.join(TESTS_DIR, f'train{i}.conf'), 'wt') as file: with open(TESTS_DIR / f'train{i}.conf', 'wt') as file:
output_model = os.path.join(TESTS_DIR, f'model{i}.txt') output_model = TESTS_DIR / f'model{i}.txt'
data = os.path.join(TESTS_DIR, f'train{i}.txt') data = TESTS_DIR / f'train{i}.txt'
file.write(f'output_model = {output_model}\n') file.write(f'output_model = {output_model}\n')
file.write(f'local_listen_port = {self.listen_ports[i]}\n') file.write(f'local_listen_port = {self.listen_ports[i]}\n')
file.write(f'data = {data}\n') file.write(f'data = {data}\n')
......
import os from pathlib import Path
TESTS_DIR = os.path.dirname(__file__) default_exec_file = Path(__file__).absolute().parents[2] / 'lightgbm'
default_exec_file = os.path.abspath(os.path.join(TESTS_DIR, '..', '..', 'lightgbm'))
def pytest_addoption(parser): def pytest_addoption(parser):
parser.addoption('--execfile', action='store', default=default_exec_file) parser.addoption('--execfile', action='store', default=str(default_exec_file))
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment