metadata.py 3.36 KB
Newer Older
1
# SPDX-License-Identifier: Apache-2.0
2
# SPDX-FileCopyrightText: Copyright contributors to the vLLM project
3
4
5
6
7
from dataclasses import dataclass

import torch

from vllm.pooling_params import PoolingParams
8
from vllm.tasks import PoolingTask
9
from vllm.utils.platform_utils import is_pin_memory_available
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31

pin_memory = is_pin_memory_available()


@dataclass
class PoolingCursor:
    index: list[int]
    first_token_indices_gpu: torch.Tensor
    last_token_indices_gpu: torch.Tensor
    prompt_lens_cpu: torch.Tensor
    num_scheduled_tokens_cpu: torch.Tensor

    def __getitem__(self, indices: slice):
        return PoolingCursor(
            index=self.index[indices],
            first_token_indices_gpu=self.first_token_indices_gpu[indices],
            last_token_indices_gpu=self.last_token_indices_gpu[indices],
            prompt_lens_cpu=self.prompt_lens_cpu[indices],
            num_scheduled_tokens_cpu=self.num_scheduled_tokens_cpu[indices],
        )

    def is_partial_prefill(self):
32
        return not torch.all(self.prompt_lens_cpu == self.num_scheduled_tokens_cpu)
33
34
35
36
37


@dataclass
class PoolingMetadata:
    """Tensors for pooling."""
38

39
    prompt_lens: torch.Tensor  # CPU Tensor
40
    prompt_token_ids: torch.Tensor | None
41
    pooling_params: list[PoolingParams]
42
    pooling_cursor: PoolingCursor | None = None
43

44
45
46
47
48
49
50
51
52
53
54
55
    def __post_init__(self) -> None:
        pooling_params = self.pooling_params

        tasks: list[PoolingTask] = [
            task
            for pooling_param in pooling_params
            if (task := pooling_param.task) is not None
        ]
        assert len(pooling_params) == len(tasks)

        self.tasks = tasks

56
57
58
    def __getitem__(self, indices: slice):
        return PoolingMetadata(
            prompt_lens=self.prompt_lens[indices],
59
60
61
            prompt_token_ids=None
            if self.prompt_token_ids is None
            else self.prompt_token_ids[indices],
62
            pooling_params=self.pooling_params[indices],
63
            pooling_cursor=None
64
65
            if self.pooling_cursor is None
            else self.pooling_cursor[indices],
66
        )
67

68
69
70
71
72
73
74
75
    def get_prompt_token_ids(self) -> list[torch.Tensor]:
        prompt_token_ids = self.prompt_token_ids
        assert prompt_token_ids is not None, (
            "Please set `requires_token_ids=True` in `get_pooling_updates`"
        )

        return [prompt_token_ids[i, :num] for i, num in enumerate(self.prompt_lens)]

76
77
78
79
80
81
    def build_pooling_cursor(
        self, num_scheduled_tokens: list[int], device: torch.device
    ):
        self.pooling_cursor = build_pooling_cursor(
            num_scheduled_tokens, self.prompt_lens, device
        )
82
83


84
85
86
def build_pooling_cursor(
    num_scheduled_tokens: list[int], prompt_lens: torch.Tensor, device: torch.device
):
87
88
89
90
    assert len(prompt_lens) == len(num_scheduled_tokens)

    n_seq = len(num_scheduled_tokens)
    index = list(range(n_seq))
91
    num_scheduled_tokens_cpu = torch.tensor(num_scheduled_tokens, device="cpu")
92
93
94
    cumsum = torch.zeros(
        n_seq + 1, dtype=torch.int64, pin_memory=pin_memory, device="cpu"
    )
95
    torch.cumsum(num_scheduled_tokens_cpu, dim=0, out=cumsum[1:])
96
    cumsum = cumsum.to(device, non_blocking=True)
97
98
99
100
101
    return PoolingCursor(
        index=index,
        first_token_indices_gpu=cumsum[:n_seq],
        last_token_indices_gpu=cumsum[1:] - 1,
        prompt_lens_cpu=prompt_lens,
102
        num_scheduled_tokens_cpu=num_scheduled_tokens_cpu,
103
    )