neuron_worker.py 3.08 KB
Newer Older
1
"""A Neuron worker class."""
2
from typing import List, Optional, Tuple
3
4
5
6

import torch
import torch.distributed

7
8
from vllm.config import (CacheConfig, DeviceConfig, ModelConfig,
                         ParallelConfig, SchedulerConfig)
9
10
from vllm.model_executor import set_random_seed
from vllm.sequence import SamplerOutput, SequenceGroupMetadata
11
from vllm.worker.neuron_model_runner import NeuronModelRunner
12
from vllm.worker.worker_base import LoraNotSupportedWorkerBase
13
14


15
class NeuronWorker(LoraNotSupportedWorkerBase):
16
17
18
19
20
21
22
23
24
    """A worker class that executes the model on a group of neuron cores.
    """

    def __init__(
        self,
        model_config: ModelConfig,
        parallel_config: ParallelConfig,
        scheduler_config: SchedulerConfig,
        device_config: DeviceConfig,
25
        cache_config: CacheConfig,
26
27
28
29
30
    ) -> None:
        self.model_config = model_config
        self.parallel_config = parallel_config
        self.scheduler_config = scheduler_config
        self.device_config = device_config
31
        self.cache_config = cache_config
32

33
34
        self.model_runner = NeuronModelRunner(model_config, parallel_config,
                                              scheduler_config, device_config)
35

36
37
    def init_device(self) -> None:
        # Set random seed.
38
39
40
41
42
        set_random_seed(self.model_config.seed)

    def load_model(self):
        self.model_runner.load_model()

43
    def determine_num_available_blocks(self) -> Tuple[int, int]:
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
        """Determine the number of available KV blocks.

        Swapping is not yet supported, so always return num_cpu_blocks=0.

        We configure num_gpu_blocks to be equal to max_num_seqs.
        """
        # Set the number of GPU blocks to be the same as the maximum number of
        # sequences that can be processed in a single batch. This is equivalent
        # to schedule without PagedAttention.
        num_gpu_blocks = self.scheduler_config.max_num_seqs

        # Swap not yet supported with Neuron backend.
        num_cpu_blocks = 0

        return num_gpu_blocks, num_cpu_blocks

    def initialize_cache(self, num_gpu_blocks: int,
                         num_cpu_blocks: int) -> None:
        """Initialize the KV cache.
        """

        # Different values are not tested.
        assert num_cpu_blocks == 0
        assert num_gpu_blocks == self.scheduler_config.max_num_seqs

        self.cache_config.num_gpu_blocks = num_gpu_blocks
        self.cache_config.num_cpu_blocks = num_cpu_blocks

72
73
74
    @torch.inference_mode()
    def execute_model(
        self,
75
        seq_group_metadata_list: List[SequenceGroupMetadata],
76
    ) -> Optional[SamplerOutput]:
77
        num_seq_groups = len(seq_group_metadata_list)
78
79
80
81
82

        # If there is no input, we don't need to execute the model.
        if num_seq_groups == 0:
            return {}

83
        output = self.model_runner.execute_model(seq_group_metadata_list)
84
        return output
85
86
87
88
89
90
91

    def get_cache_block_size_bytes(self) -> int:
        """Determine the size in bytes of a cache block.

        This is required for speculative decoding; it is not yet implemented.
        """
        raise NotImplementedError