dist_launcher.py 3.94 KB
Newer Older
1
# Copyright (c) 2022-2024, NVIDIA CORPORATION & AFFILIATES. All rights reserved.
2
3
4
5
6
7
8
9
10
11
12
#
# See LICENSE for license information.
"""Helper functions to launch distributed tests"""

import copy
import os
from pathlib import Path
import subprocess
import time
import unittest

13
14
15
16
try:
    from paddle.base import core
except ImportError:
    from paddle.fluid import core
17
18
19
20
21
22
23
from paddle.distributed.utils.launch_utils import (
    TrainerProc,
    find_free_ports,
    get_cluster,
    watch_local_trainers,
)

24
__all__ = ["TestDistributed"]
25
26
27
28


def get_cluster_from_args(selected_gpus):
    """Get node information from selected GPUs"""
29
30
    cluster_node_ips = "127.0.0.1"
    node_ip = "127.0.0.1"
31

32
    node_ips = [x.strip() for x in cluster_node_ips.split(",")]
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49

    node_ips.index(node_ip)

    free_ports = None

    free_ports = find_free_ports(len(selected_gpus))
    if free_ports is not None:
        free_ports = list(free_ports)

    trainer_endpoints = []
    for ip in node_ips:
        trainer_endpoints.append([f"{ip}:{port}" for port in free_ports])
    return get_cluster(node_ips, node_ip, trainer_endpoints, selected_gpus)


def get_gpus(selected_gpus):
    """Get selected GPU string"""
50
    selected_gpus = [x.strip() for x in selected_gpus.split(",")]
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
    return selected_gpus


def start_local_trainers(
    cluster,
    pod,
    training_script,
    training_script_args,
    allocator_strategy="auto_growth",
):
    """Launch trainers"""
    current_env = copy.copy(os.environ.copy())
    # paddle broadcast ncclUniqueId use socket, and
    # proxy maybe make trainers unreachable, so delete them.
    # if we set them to "", grpc will log error message "bad uri"
    # so just delete them.
    current_env.pop("http_proxy", None)
    current_env.pop("https_proxy", None)

    procs = []
    for t in pod.trainers:
        proc_env = {
            "FLAGS_selected_gpus": ",".join([str(g) for g in t.gpus]),
            "PADDLE_TRAINER_ID": f"{t.rank}",
            "PADDLE_CURRENT_ENDPOINT": f"{t.endpoint}",
            "PADDLE_TRAINERS_NUM": f"{cluster.trainers_nranks()}",
            "PADDLE_TRAINER_ENDPOINTS": ",".join(cluster.trainers_endpoints()),
            "PYTHONPATH": str(Path(__file__).resolve().parent),
        }

        proc_env["FLAGS_allocator_strategy"] = allocator_strategy
        if allocator_strategy == "auto_growth":
            proc_env["FLAGS_fraction_of_gpu_memory_to_use"] = "0.1"

        current_env.update(proc_env)

        print(f"trainer proc env:{current_env}")

89
        if os.getenv("WITH_COVERAGE", "OFF") == "ON":
90
91
92
93
94
95
96
97
            cmd = "python -m coverage run --branch -p " + training_script
        else:
            cmd = "python -u " + training_script

        print(f"start trainer proc:{cmd} env:{proc_env}")

        fn = None

98
99
100
        proc = subprocess.Popen(
            cmd.split(" ") + training_script_args, env=current_env
        )  # pylint: disable=consider-using-with
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121

        tp = TrainerProc()
        tp.proc = proc
        tp.rank = t.rank
        tp.log_fn = fn
        tp.cmd = cmd

        procs.append(tp)

    return procs


class TestDistributed(unittest.TestCase):
    """Base class for distributed test"""

    @staticmethod
    def run_2gpu(
        target_file_name,
        allocator_strategy="auto_growth",
    ):
        """Run target file in subprocesses"""
122
        if not core.is_compiled_with_cuda() or core.get_cuda_device_count() == 0:
123
124
            return

125
        selected_gpus = get_gpus("0,1")
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
        cluster = None
        pod = None

        cluster, pod = get_cluster_from_args(selected_gpus)

        procs = start_local_trainers(
            cluster,
            pod,
            allocator_strategy=allocator_strategy,
            training_script=target_file_name,
            training_script_args=[],
        )

        while True:
            alive = watch_local_trainers(procs, cluster.trainers_endpoints())

            if not alive:
                print(f"Local procs complete, POD info:{pod}")
                break
            time.sleep(3)