parallel_run.py 4.84 KB
Newer Older
yuguo's avatar
yuguo committed
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
import asyncio
import os
import argparse
from subprocess import PIPE, STDOUT
import glob
import sys
import time
import socket
from contextlib import closing
import uuid


def gen_cmds(cmd=None, dir=None, doctest=False):
    if doctest:
        paths = glob.glob(os.path.join(dir, "**/*.py"), recursive=True)
        paths = [
            p
            for p in paths
            if "compatible" not in p
            and "single_client" not in p
            and "unittest.py" not in p
        ]
        with_doctest = []
        for p in paths:
            with open(p) as f:
                content = f.read()
                if "import doctest" in content:
                    with_doctest.append("{} {} -v".format(cmd, p))
        print(with_doctest)
        return with_doctest
    else:
        paths = glob.glob(os.path.join(dir, "test_*.py"), recursive=False)
        return ["{} {} --failfast --verbose".format(cmd, p) for p in paths]


def find_free_port():
    with closing(socket.socket(socket.AF_INET, socket.SOCK_STREAM)) as s:
        s.bind(("localhost", 0))
        s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
        return s.getsockname()[1]


def split_and_print(prefix, text):
    lines = text.splitlines(keepends=True)
    prefixed = ""
    for l in lines:
        prefixed += f"{prefix} {l}"
    print(prefixed, flush=True)


def everyN(l: list, n: int):
    for i in range(0, len(l), n):
        yield l[i : i + n]


def contains_oom_info(txt: str):
    return "memory" in txt or "Memory" in txt or "CUDNN" in txt or "ALLOC" in txt


def should_retry(txt: str):
    return contains_oom_info(txt)


def print_out(prefix: str = "", content: str = ""):
    for l in content.split("\n"):
        print(f"[{prefix}]", l)


async def spawn_shell_and_check(cmd: str = None, gpu_id: int = -1, check: bool = False):
    is_cpu_only = os.getenv("ONEFLOW_TEST_CPU_ONLY")
    print(f"[gpu={gpu_id}]", cmd)
    p = await asyncio.create_subprocess_shell(
        cmd,
        stdout=PIPE,
        stderr=STDOUT,
        env=dict(
            os.environ,
            CUDA_VISIBLE_DEVICES=("-1" if is_cpu_only else ",".join([str(gpu_id)])),
            ONEFLOW_TEST_MASTER_PORT=str(find_free_port()),
            ONEFLOW_TEST_LOG_DIR=("./unittest-log-" + str(uuid.uuid4())),
        ),
    )
    (stdout_data, stderr_data) = await p.communicate()
    decoded = stdout_data.decode()
    if check or should_retry(decoded) == False:
        if p.returncode != 0:
            print_out(prefix=cmd, content=decoded)
            raise RuntimeError(cmd)
    return {"returncode": p.returncode, "cmd": cmd, "stdout": decoded}


async def run_cmds(
    cmds, gpu_num=0, timeout=10, chunk=1, verbose=False, per_gpu_process_num=1
):
    is_cpu_only = os.getenv("ONEFLOW_TEST_CPU_ONLY")
    if is_cpu_only:
        gpu_num = os.cpu_count()
    fails = []
    assert gpu_num > 0
    for cmdN in everyN(cmds, per_gpu_process_num * gpu_num):
        results = await asyncio.gather(
            *[
                spawn_shell_and_check(
                    cmd=cmd, gpu_id=i, check=(per_gpu_process_num == 1)
                )
                for cmd_gpu_num in everyN(cmdN, gpu_num)
                for (i, cmd) in enumerate(cmd_gpu_num)
            ],
        )
        for r in list(results):
            if r["returncode"] != 0:
                fails.append(r["cmd"])
            else:
                print_out(prefix=r["cmd"], content=r["stdout"])
    return fails


if __name__ == "__main__":
    parser = argparse.ArgumentParser()
    parser.add_argument("--gpu_num", type=int, required=True, default=0)
    parser.add_argument("--dir", type=str, required=True, default=".")
    parser.add_argument("--cmd", type=str, required=False, default=sys.executable)
    parser.add_argument("--timeout", type=int, required=False, default=2)
    parser.add_argument("--chunk", type=int, required=True)
    parser.add_argument("--verbose", action="store_true", required=False, default=False)
    parser.add_argument("--doctest", action="store_true", required=False, default=False)
    args = parser.parse_args()
    cmds = gen_cmds(cmd=args.cmd, dir=args.dir, doctest=args.doctest)
    start = time.time()
    loop = asyncio.get_event_loop()
    PER_GPU_PROCESS_NUMS = [12, 8, 2, 1]
    is_cpu_only = os.getenv("ONEFLOW_TEST_CPU_ONLY")
    if is_cpu_only:
        PER_GPU_PROCESS_NUMS = [1]
    for per_gpu_process_num in PER_GPU_PROCESS_NUMS:
        print("[per_gpu_process_num]", per_gpu_process_num)
        cmds = loop.run_until_complete(
            run_cmds(
                cmds,
                gpu_num=args.gpu_num,
                timeout=args.timeout,
                chunk=args.chunk,
                verbose=args.verbose,
                per_gpu_process_num=per_gpu_process_num,
            )
        )
    elapsed = time.time() - start
    elapsed_time_txt = time.strftime("elapsed: %H:%M:%S", time.gmtime(elapsed))
    print(elapsed_time_txt)