ray_utils.py 4.25 KB
Newer Older
1
import socket
2
3
4
5
6
7
8
from typing import List, Optional, Tuple

try:
    import ray
except ImportError:
    ray = None

Woosuk Kwon's avatar
Woosuk Kwon committed
9
from vllm.config import ParallelConfig
10

11
12
# rank, node resource (node IP), device id
DeviceID = Tuple[int, Optional[str], int]
13
14


15
16
17
18
19
20
def get_open_port():
    with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
        s.bind(("", 0))
        return s.getsockname()[1]


21
22
def initialize_cluster(
    parallel_config: ParallelConfig,
Zhuohan Li's avatar
Zhuohan Li committed
23
24
    engine_use_ray: bool = False,
    ray_address: Optional[str] = None,
25
) -> Tuple[str, List[List[DeviceID]]]:
26
27
28
29
    """Initialize the distributed cluster probably with Ray.

    Args:
        parallel_config: The configurations for parallel execution.
Zhuohan Li's avatar
Zhuohan Li committed
30
31
        engine_use_ray: Whether to use Ray for async engine.
        ray_address: The address of the Ray cluster. If None, uses
32
33
34
35
36
37
38
39
40
            the default Ray cluster address.

    Returns:
        A tuple of (`distributed_init_method`, `all_stage_devices`). The
        `distributed_init_method` is the address for initializing the
        distributed backend. `all_stage_devices` includes device IDs for
        each worker in each pipeline stage. Each device ID is a tuple of
        (rank, node resource, device id).
    """
Zhuohan Li's avatar
Zhuohan Li committed
41
    if parallel_config.worker_use_ray or engine_use_ray:
42
43
44
45
46
        if ray is None:
            raise ImportError(
                "Ray is not installed. Please install Ray to use distributed "
                "serving.")
        # Connect to a ray cluster.
Zhuohan Li's avatar
Zhuohan Li committed
47
        ray.init(address=ray_address)
48
49

    if not parallel_config.worker_use_ray:
50
        # Initialize cluster locally.
51
        port = get_open_port()
52
53
54
55
56
57
58
59
60
61
62
        # We need to setup the distributed init method to make sure
        # the distributed megatron code (e.g., get world size) works correctly.
        distributed_init_method = f"tcp://localhost:{port}"
        all_stage_devices = [[(0, None, 0)]]
        return distributed_init_method, all_stage_devices

    # Assume we have a uniform cluster that each node has the same number of
    # GPUs for now.
    valid_node_resources = []
    num_devices_per_node = None
    for node in ray.nodes():
63
        if (not node["Alive"]) or node["Resources"]["GPU"] <= 0:
64
65
            continue
        if num_devices_per_node is None:
66
            num_devices_per_node = node["Resources"]["GPU"]
67
        else:
68
            assert num_devices_per_node == node["Resources"]["GPU"], (
69
                "The number of GPUs per node is not uniform.")
70
71
        for key in node["Resources"]:
            if key.startswith("node:"):
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
                valid_node_resources.append(key)

    # Verify the parallel config.
    num_nodes = len(valid_node_resources)
    if parallel_config.world_size > num_nodes * num_devices_per_node:
        raise ValueError(
            "The number of required GPUs exceeds the total number of "
            "available GPUs.")
    if parallel_config.tensor_parallel_size >= num_devices_per_node:
        if parallel_config.tensor_parallel_size % num_devices_per_node != 0:
            raise ValueError(
                "The number of tensor parallelism is not divisible by the "
                "number of GPUs per node.")
    else:
        if num_devices_per_node % parallel_config.tensor_parallel_size != 0:
            raise ValueError(
                "The number of GPUs per node is not divisible by the number "
                "of tensor parallelism.")

    # Assign GPUs to pipeline stages.
    rank = 0
    current_node_id = 0
    current_device_id = 0
    distributed_init_method = None
    all_stage_devices = []

    for _ in range(parallel_config.pipeline_parallel_size):
        stage_devices = []
        for _ in range(parallel_config.tensor_parallel_size):
            node_resource = valid_node_resources[current_node_id]
            stage_devices.append((rank, node_resource, current_device_id))
            if distributed_init_method is None:
                ip = node_resource.split("node:")[-1]
105
                port = get_open_port()
106
107
108
109
110
111
112
113
114
                distributed_init_method = f"tcp://{ip}:{port}"
            rank += 1
            current_device_id += 1
            if current_device_id >= num_devices_per_node:
                current_node_id += 1
                current_device_id = 0
        all_stage_devices.append(stage_devices)

    return distributed_init_method, all_stage_devices