controller.py 3.02 KB
Newer Older
Zhuohan Li's avatar
Zhuohan Li committed
1
2
3
from typing import Dict, List, Union, Tuple

import ray
Woosuk Kwon's avatar
Woosuk Kwon committed
4
5

from cacheflow.master.scheduler import Scheduler
6
from cacheflow.sequence import SequenceGroupInputs
Woosuk Kwon's avatar
Woosuk Kwon committed
7
8
9
from cacheflow.worker.worker import Worker


Zhuohan Li's avatar
Zhuohan Li committed
10
11
12
DeviceID = Tuple[int, str, int] # rank, node resource (node IP), device id


Woosuk Kwon's avatar
Woosuk Kwon committed
13
14
15
16
class Controller:

    def __init__(
        self,
Zhuohan Li's avatar
Zhuohan Li committed
17
18
19
20
21
22
        stage_id: int,
        stage_devices: List[DeviceID],
        world_size: int,
        tensor_parallel_size: int,
        pipeline_parallel_size: int,
        distributed_init_method: str,
Woosuk Kwon's avatar
Woosuk Kwon committed
23
24
25
26
        model_name: str,
        block_size: int,
        num_gpu_blocks: int,
        num_cpu_blocks: int,
27
28
        dtype: str,
        seed: int,
Zhuohan Li's avatar
Zhuohan Li committed
29
        model_path: str,
Woosuk Kwon's avatar
Woosuk Kwon committed
30
    ) -> None:
Zhuohan Li's avatar
Zhuohan Li committed
31
32
        self.stage_id = stage_id
        self.stage_devices = stage_devices
Woosuk Kwon's avatar
Woosuk Kwon committed
33
34
35
36
37
38
        self.model_name = model_name
        self.block_size = block_size
        self.num_gpu_blocks = num_gpu_blocks
        self.num_cpu_blocks = num_cpu_blocks

        # Which pipeline stage is this node assigned to?
Zhuohan Li's avatar
Zhuohan Li committed
39
        self.is_first_stage = stage_id == 0
Woosuk Kwon's avatar
Woosuk Kwon committed
40
41
42
        self.is_last_stage = False

        self.workers: List[Worker] = []
Zhuohan Li's avatar
Zhuohan Li committed
43
44
45
46
47
        for rank, node_resource, device_id in stage_devices:
            worker_cls = ray.remote(num_cpus=0,
                                    num_gpus=1,
                                    resources={node_resource: 1e-5})(Worker)
            worker = worker_cls.remote(
Woosuk Kwon's avatar
Woosuk Kwon committed
48
49
50
51
                model_name=model_name,
                block_size=block_size,
                num_gpu_blocks=num_gpu_blocks,
                num_cpu_blocks=num_cpu_blocks,
Woosuk Kwon's avatar
Woosuk Kwon committed
52
                dtype=dtype,
53
                seed=seed,
Zhuohan Li's avatar
Zhuohan Li committed
54
55
56
57
58
59
                distributed_init_method=distributed_init_method,
                rank=rank,
                world_size=world_size,
                tensor_parallel_size=tensor_parallel_size,
                pipeline_parallel_size=pipeline_parallel_size,
                model_path=model_path,
Woosuk Kwon's avatar
Woosuk Kwon committed
60
61
62
63
64
65
66
67
68
69
70
71
            )
            self.workers.append(worker)

    def set_next(
        self,
        next_node: Union['Controller', 'Scheduler'],
    ) -> None:
        self.next_node = next_node
        self.is_last_stage = isinstance(next_node, Scheduler)

    def execute_stage(
        self,
72
        input_seq_groups: List[SequenceGroupInputs],
Woosuk Kwon's avatar
Woosuk Kwon committed
73
74
        blocks_to_swap_in: Dict[int, int],
        blocks_to_swap_out: Dict[int, int],
75
        blocks_to_copy: Dict[int, List[int]],
Woosuk Kwon's avatar
Woosuk Kwon committed
76
    ) -> None:
Zhuohan Li's avatar
Zhuohan Li committed
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
        futures = []
        for worker in self.workers:
            future = worker.execute_stage.remote(
                input_seq_groups,
                blocks_to_swap_in,
                blocks_to_swap_out,
                blocks_to_copy,
            )
            futures.append(future)

        all_outputs = ray.get(futures)
        # Make sure all workers have the same results.
        output = all_outputs[0]
        for other_output in all_outputs[1:]:
            assert output == other_output
Woosuk Kwon's avatar
Woosuk Kwon committed
92
93
94
95
96
97

        if self.is_last_stage:
            self.next_node.post_step(output)
        else:
            # TODO: Support pipeline parallelism.
            assert False