neuron_worker.py 4.97 KB
Newer Older
1
"""A Neuron worker class."""
2
from typing import List, Optional, Tuple
3
4
5
6

import torch
import torch.distributed

7
from vllm.config import VllmConfig
8
9
from vllm.distributed import (ensure_model_parallel_initialized,
                              init_distributed_environment)
10
from vllm.model_executor import set_random_seed
11
from vllm.model_executor.layers.sampler import SamplerOutput
12
from vllm.sequence import ExecuteModelRequest
13
from vllm.worker.cache_engine import CacheEngine
14
from vllm.worker.neuron_model_runner import NeuronModelRunner
15
from vllm.worker.worker_base import (LocalOrDistributedWorkerBase,
16
17
                                     LoraNotSupportedWorkerBase, WorkerBase,
                                     WorkerInput)
18
19


20
class NeuronWorker(LoraNotSupportedWorkerBase, LocalOrDistributedWorkerBase):
21
22
23
24
25
    """A worker class that executes the model on a group of neuron cores.
    """

    def __init__(
        self,
26
        vllm_config: VllmConfig,
27
28
29
        local_rank: int,
        rank: int,
        distributed_init_method: str,
30
        is_driver_worker: bool = True,
31
    ) -> None:
32
        WorkerBase.__init__(self, vllm_config=vllm_config)
33
34
35
        self.local_rank = local_rank
        self.rank = rank
        self.distributed_init_method = distributed_init_method
36
37
38
39
        if self.model_config.trust_remote_code:
            # note: lazy import to avoid importing torch before initializing
            from vllm.utils import init_cached_hf_modules
            init_cached_hf_modules()
40

41
        self.model_runner: NeuronModelRunner = NeuronModelRunner(
42
            vllm_config=vllm_config)
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
        self.is_driver_worker = is_driver_worker

    def execute_model(
        self,
        execute_model_req: Optional[ExecuteModelRequest] = None,
    ) -> Optional[List[SamplerOutput]]:
        assert execute_model_req is not None
        assert (not execute_model_req.blocks_to_swap_in
                and not execute_model_req.blocks_to_swap_out
                and not execute_model_req.blocks_to_copy), (
                    "Cache operations are not supported for Neuron backend.")
        assert execute_model_req.num_lookahead_slots == 0, (
            "lookahead not supported for Neuron backend.")
        output = LocalOrDistributedWorkerBase.execute_model(
            self, execute_model_req)
        return output
59

60
    def init_device(self) -> None:
61
62
        self.init_distributed_environment()

63
        # Set random seed.
64
65
66
67
68
        set_random_seed(self.model_config.seed)

    def load_model(self):
        self.model_runner.load_model()

69
    def determine_num_available_blocks(self) -> Tuple[int, int]:
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
        """Determine the number of available KV blocks.

        Swapping is not yet supported, so always return num_cpu_blocks=0.

        We configure num_gpu_blocks to be equal to max_num_seqs.
        """
        # Set the number of GPU blocks to be the same as the maximum number of
        # sequences that can be processed in a single batch. This is equivalent
        # to schedule without PagedAttention.
        num_gpu_blocks = self.scheduler_config.max_num_seqs

        # Swap not yet supported with Neuron backend.
        num_cpu_blocks = 0

        return num_gpu_blocks, num_cpu_blocks

    def initialize_cache(self, num_gpu_blocks: int,
                         num_cpu_blocks: int) -> None:
        """Initialize the KV cache.
        """

        # Different values are not tested.
        assert num_cpu_blocks == 0
        assert num_gpu_blocks == self.scheduler_config.max_num_seqs

        self.cache_config.num_gpu_blocks = num_gpu_blocks
        self.cache_config.num_cpu_blocks = num_cpu_blocks

98
99
100
    @property
    def do_metadata_broadcast(self) -> bool:
        return False
101

102
    @property
103
    def kv_cache(self) -> Optional[List[List[torch.Tensor]]]:
104
        return None
105
106
107
108
    
    @property
    def cache_engines(self) -> Optional[List[CacheEngine]]:
        return None
109

110
111
112
113
114
    @torch.inference_mode()
    def prepare_worker_input(
            self, execute_model_req: ExecuteModelRequest) -> WorkerInput:
        return WorkerInput(num_seq_groups=len(
            execute_model_req.seq_group_metadata_list), )
115

116
117
118
    def execute_worker(self, worker_input: WorkerInput) -> None:
        pass

119
120
121
122
123
124
    def get_cache_block_size_bytes(self) -> int:
        """Determine the size in bytes of a cache block.

        This is required for speculative decoding; it is not yet implemented.
        """
        raise NotImplementedError
125
126
127

    def init_distributed_environment(self):
        """Neuron uses transformers-neuronx for tensor parallelism.
128
129
130
        It has only one process to control multiple devices.
        vLLM still needs the environment initialized when TP/PP > 1,
        so we initialize a distributed environment with one process.
131
132
133
        """
        init_distributed_environment(
            world_size=1,
134
135
            rank=0,
            local_rank=0,
136
137
138
139
140
141
142
            distributed_init_method=self.distributed_init_method,
            backend="gloo",
        )
        ensure_model_parallel_initialized(
            1,
            1,
        )